[jira] [Created] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase

2024-06-13 Thread Di Wu (Jira)
Di Wu created FLINK-35600:
-

 Summary: Data read duplication during the full-to-incremental 
conversion phase
 Key: FLINK-35600
 URL: https://issues.apache.org/jira/browse/FLINK-35600
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: Di Wu


Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method 
shouldEmit to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}

*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35600) Data read duplication during the full-to-incremental conversion phase

2024-06-13 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-35600:
--
Description: 
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
*shouldEmit* to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}
*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*

  was:
Assume that the table has been split into 3 Chunks

Timeline
t1: chunk1 is read
t2: a piece of data A belonging to chunk2 is inserted in MySQL
t3: chunk2 is read, and data A has been sent downstream
t4: chunk3 is read

At this time, startOffset will be set to lowwatermark
t5: BinlogSplitReader.pollSplitRecords receives data A, and uses the method 
shouldEmit to determine whether the data is sent downstream

In this method
{code:java}
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) 
{
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
} {code}

*maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data without 
ts_sec variable, and the default value is 0
*position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
So this expression is judged as true

*Data A continues to be sent downstream, and the data is repeated*


> Data read duplication during the full-to-incremental conversion phase
> -
>
> Key: FLINK-35600
> URL: https://issues.apache.org/jira/browse/FLINK-35600
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Assume that the table has been split into 3 Chunks
> Timeline
> t1: chunk1 is read
> t2: a piece of data A belonging to chunk2 is inserted in MySQL
> t3: chunk2 is read, and data A has been sent downstream
> t4: chunk3 is read
> At this time, startOffset will be set to lowwatermark
> t5: *BinlogSplitReader.pollSplitRecords* receives data A, and uses the method 
> *shouldEmit* to determine whether the data is sent downstream
> In this method
> {code:java}
> private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset 
> position) {
> if (pureBinlogPhaseTables.contains(tableId)) {
> return true;
> }
> // the existed tables those have finished snapshot reading
> if (maxSplitHighWatermarkMap.containsKey(tableId)
> && position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
> pureBinlogPhaseTables.add(tableId);
> return true;
> }
> } {code}
> *maxSplitHighWatermarkMap.get(tableId)* obtains the HighWatermark data 
> without ts_sec variable, and the default value is 0
> *position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))*
> So this expression is judged as true
> *Data A continues to be sent downstream, and the data is repeated*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35838) FLIP-399: Flink Connector Doris

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35838:
-

 Summary: FLIP-399: Flink Connector Doris
 Key: FLINK-35838
 URL: https://issues.apache.org/jira/browse/FLINK-35838
 Project: Flink
  Issue Type: New Feature
Reporter: Di Wu


As discussed in  Flink dev  mailing list[1][2], we should finish the repo and 
doc migration as soon as possible.

https://lists.apache.org/thread/w3hoglk0pqbzqhzlfcgzkkz3xrwo90rt
https://lists.apache.org/thread/b32qvhzpmq06z2x5s9c8qr3pzsnld34m



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35839) Migrate repo from apache/doris to apache/flink

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35839:
-

 Summary: Migrate repo from apache/doris to apache/flink
 Key: FLINK-35839
 URL: https://issues.apache.org/jira/browse/FLINK-35839
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35840) Add documentation for Doris

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35840:
-

 Summary: Add documentation for Doris
 Key: FLINK-35840
 URL: https://issues.apache.org/jira/browse/FLINK-35840
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35841) Change package name to org.apache.flink

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35841:
-

 Summary: Change package name to org.apache.flink
 Key: FLINK-35841
 URL: https://issues.apache.org/jira/browse/FLINK-35841
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35842) Change the FlinkCDC dependency to 3.1.x

2024-07-15 Thread Di Wu (Jira)
Di Wu created FLINK-35842:
-

 Summary: Change the FlinkCDC dependency to 3.1.x
 Key: FLINK-35842
 URL: https://issues.apache.org/jira/browse/FLINK-35842
 Project: Flink
  Issue Type: Sub-task
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN

2024-10-30 Thread Di Wu (Jira)
Di Wu created FLINK-36631:
-

 Summary: Supports reading incremental data from Oracle from a 
specified SCN
 Key: FLINK-36631
 URL: https://issues.apache.org/jira/browse/FLINK-36631
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: 3.0.0
Reporter: Di Wu


OracleCDC currently only supports initial and latest-offset. Add 
specific-offset to support reading incremental data from a specified SCN.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-03 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Description: 
Oracle When reading via OracleIncrementalSource, the connection is occasionally 
closed.

 
{code:java}
14:57:56,432 INFO  
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader 
[Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: 
Print to Std. Out (1/1)#0] [] - Close snapshot reader 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                              
[pool-14-thread-1] [] - Connection gracefully closed
14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         
[debezium-snapshot-reader-0] [] - Mining session stopped due to the 
java.sql.SQLException: 关闭的 Resultset: getLong
14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                            
[debezium-snapshot-reader-0] [] - Producer failure
java.sql.SQLException: 关闭的 Resultset: getLong
    at 
oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
 ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
    at 
io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
 ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) 
~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) 
~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at 
io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
 ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at 
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
 ~[flink-connector-oracle-cdc/:?]
    at 
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
 ~[flink-cdc-base/:?]
    at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
 ~[flink-cdc-base/:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_322]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_322]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
 

{*}reason{*}: 

 

This is because after split is read, the reader will be closed, at which point 
LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics 
to obtain statistical information.

Finally, in the code

 
{code:java}
public  T queryAndMap(String query, StatementFactory statementFactory, 
ResultSetMapper mapper) throws SQLException {
Objects.requireNonNull(mapper, "Mapper must be provided");
Connection conn = connection();  // Check if the conn is connected
    try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
//When you get here, split executes the close method to close the 
connection, and an error will be reported
            return mapper.apply(resultSet);
}
}
} {code}
 

*solve:*

-1. we can regenerate a connection before calling the 
*captureSessionMemoryStatistics(connection)* method, but this will be 
time-consuming. In my local test, it took 6 seconds.-

2. Since *captureSessionMemoryStatistics* is just statistical information, I 
think it can be placed before {*}process{*}, so that it can ensure that the 
connection is no longer in use when split reader close

 

 

  was:
Oracle When reading via OracleIncrementalSource, the connection is occasionally 
closed.

 
{code:java}
14:57:56,432 INFO  
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader 
[Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: 
Print to Std. Out (1/1)#0] [] - Close snapshot reader 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
14:57:56,597 INFO  io.debezium.jdbc

[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Fix Version/s: 3.0.0

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC
> Fix For: 3.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Description: 
Oracle When reading via OracleIncrementalSource, the connection is occasionally 
closed.

 

 

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC
> Fix For: cdc-3.3.0
>
>
> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1

2024-11-10 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36687:
--
Description: 
Update Doris Pipeline Connector Version to 24.0.1

Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
[apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
[apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]

  was:Update Doris Pipeline Connector Version to 24.0.1


> Improve Doris Pipeline Connector Version to 24.0.1
> --
>
> Key: FLINK-36687
> URL: https://issues.apache.org/jira/browse/FLINK-36687
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Update Doris Pipeline Connector Version to 24.0.1
> Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
> [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
> [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1

2024-11-10 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36687:
--
Description: 
Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
[apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
[apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]

  was:
Update Doris Pipeline Connector Version to 24.0.1

Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
[apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
[apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]


> Improve Doris Pipeline Connector Version to 24.0.1
> --
>
> Key: FLINK-36687
> URL: https://issues.apache.org/jira/browse/FLINK-36687
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
> [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
> [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1

2024-11-11 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu closed FLINK-36687.
-
Resolution: Resolved

> Improve Doris Pipeline Connector Version to 24.0.1
> --
>
> Key: FLINK-36687
> URL: https://issues.apache.org/jira/browse/FLINK-36687
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
> [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
> [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1

2024-11-11 Thread Di Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897140#comment-17897140
 ] 

Di Wu commented on FLINK-36687:
---

This has been resolved here https://github.com/apache/flink-cdc/pull/3691

> Improve Doris Pipeline Connector Version to 24.0.1
> --
>
> Key: FLINK-36687
> URL: https://issues.apache.org/jira/browse/FLINK-36687
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Di Wu
>Priority: Major
>  Labels: pull-request-available
>
> Improve Doris Pipeline Connector Version to 24.0.1, Related release note 
> [apache/doris-flink-connector#477|https://github.com/apache/doris-flink-connector/issues/477]
> [apache/doris-flink-connector#499|https://github.com/apache/doris-flink-connector/issues/499]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36687) Improve Doris Pipeline Connector Version to 24.0.1

2024-11-10 Thread Di Wu (Jira)
Di Wu created FLINK-36687:
-

 Summary: Improve Doris Pipeline Connector Version to 24.0.1
 Key: FLINK-36687
 URL: https://issues.apache.org/jira/browse/FLINK-36687
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: Di Wu


Update Doris Pipeline Connector Version to 24.0.1



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36397) Using the offset obtained after a query transaction as a high watermark cannot ensure exactly-once semantics.

2024-11-11 Thread Di Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17897115#comment-17897115
 ] 

Di Wu commented on FLINK-36397:
---

In snapshotreader, the data from lw to hw will be replayed, which should not 
cause data loss.

> Using the offset obtained after a query transaction as a high watermark 
> cannot ensure exactly-once semantics.
> -
>
> Key: FLINK-36397
> URL: https://issues.apache.org/jira/browse/FLINK-36397
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: Zhongmin Qiao
>Assignee: Zhongmin Qiao
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.1
>
> Attachments: picture1.png
>
>
> !picture1.png|width=564,height=357!
> Using the offset obtained after a query transaction as a high watermark 
> cannot ensure exactly-once semantics because "show master status" and the 
> query action are not in the same transaction. There may be data inserted 
> between the query action and the retrieval of the high watermark. As a 
> result, these data will be lost since we only deliver data after the high 
> watermark during the binlog phase.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Fix Version/s: cdc-3.3.0
   (was: 3.0.0)

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC
> Fix For: cdc-3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Labels: CDC  (was: )

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Affects Version/s: cdc-3.1.1
   cdc-3.2.0
   cdc-3.3.0

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0, cdc-3.1.1, cdc-3.3.0
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
>
> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed.
>  
> {code:java}
> 14:57:56,432 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader
>  [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> 
> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
> 14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                            
>   [pool-14-thread-1] [] - Connection gracefully closed
> 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper       
>   [debezium-snapshot-reader-0] [] - Mining session stopped due to the 
> java.sql.SQLException: 关闭的 Resultset: getLong
> 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                          
>   [debezium-snapshot-reader-0] [] - Producer failure
> java.sql.SQLException: 关闭的 Resultset: getLong
>     at 
> oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
>  ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
>     at 
> io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
>  ~[flink-cdc-base/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
>  ~[flink-cdc-base/:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_322]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_322]
>     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
>  
> {*}reason{*}: 
>  
> This is because after split is read, the reader will be closed, at which 
> point LogMinerStreamingChangeEventSource will perform 
> captureSessionMemoryStatistics to obtain statistical information.
> Finally, in the code
>  
> {code:java}
> public  T queryAndMap(String query, StatementFactory statementFactory, 
> ResultSetMapper mapper) throws SQLException {
> Objects.requireNonNull(mapper, "Mapper must be provided");
> Connection conn = connection();  // Check if the conn is connected
>     try (Statement statement = statementFactory.createStatement(conn);) {
> if (LOGGER.isTraceEnabled()) {
> LOGGER.trace("running '{}'", query);
> }
> try (ResultSet resultSet = statement.executeQuery(query);) {
> //When you get here, split executes the close method to close the 
> connection, and an error will be reported
>             return mapper.apply(resultSet);
> }
> }
> } {code}
>  
> *solve:*
> -1. we can regenerate a connection before calling the 
> *captureSessionMemoryStatist

[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Description: 
Oracle When reading via OracleIncrementalSource, the connection is occasionally 
closed.

 
{code:java}
14:57:56,432 INFO  
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader 
[Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> Sink: 
Print to Std. Out (1/1)#0] [] - Close snapshot reader 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                              
[pool-14-thread-1] [] - Connection gracefully closed
14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper         
[debezium-snapshot-reader-0] [] - Mining session stopped due to the 
java.sql.SQLException: 关闭的 Resultset: getLong
14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                            
[debezium-snapshot-reader-0] [] - Producer failure
java.sql.SQLException: 关闭的 Resultset: getLong
    at 
oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
 ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
    at 
io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
 ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) 
~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) 
~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
    at 
io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
 ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
    at 
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
 ~[flink-connector-oracle-cdc/:?]
    at 
io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
 ~[flink-connector-oracle-cdc/:?]
    at 
org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
 ~[flink-cdc-base/:?]
    at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
 ~[flink-cdc-base/:?]
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_322]
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_322]
    at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
 

{*}reason{*}: 

 

This is because after split is read, the reader will be closed, at which point 
LogMinerStreamingChangeEventSource will perform captureSessionMemoryStatistics 
to obtain statistical information.

Finally, in the code

 
{code:java}
public  T queryAndMap(String query, StatementFactory statementFactory, 
ResultSetMapper mapper) throws SQLException {
Objects.requireNonNull(mapper, "Mapper must be provided");
Connection conn = connection();  // Check if the conn is connected
    try (Statement statement = statementFactory.createStatement(conn);) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("running '{}'", query);
}
try (ResultSet resultSet = statement.executeQuery(query);) {
//When you get here, split executes the close method to close the 
connection, and an error will be reported
            return mapper.apply(resultSet);
}
}
} {code}
 

*solve:*

-1. we can regenerate a connection before calling the 
*captureSessionMemoryStatistics(connection)* method, but this will be 
time-consuming. In my local test, it took 6 seconds.-

2. Since *captureSessionMemoryStatistics* is just statistical information, I 
think it can be placed before process, so that it can ensure that the 
connection is no longer in use when split reader close

 

 

  was:
Oracle When reading via OracleIncrementalSource, the connection is occasionally 
closed.

 

 


> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Fli

[jira] [Updated] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36649:
--
Fix Version/s: (was: cdc-3.3.0)

> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed
> --
>
> Key: FLINK-36649
> URL: https://issues.apache.org/jira/browse/FLINK-36649
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
>
> Oracle When reading via OracleIncrementalSource, the connection is 
> occasionally closed.
>  
> {code:java}
> 14:57:56,432 INFO  
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader
>  [Source Data Fetcher for Source: doris_source[1] -> SinkConversion[2] -> 
> Sink: Print to Std. Out (1/1)#0] [] - Close snapshot reader 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher
> 14:57:56,597 INFO  io.debezium.jdbc.JdbcConnection                            
>   [pool-14-thread-1] [] - Connection gracefully closed
> 14:57:56,602 ERROR io.debezium.connector.oracle.logminer.LogMinerHelper       
>   [debezium-snapshot-reader-0] [] - Mining session stopped due to the 
> java.sql.SQLException: 关闭的 Resultset: getLong
> 14:57:56,603 ERROR io.debezium.pipeline.ErrorHandler                          
>   [debezium-snapshot-reader-0] [] - Producer failure
> java.sql.SQLException: 关闭的 Resultset: getLong
>     at 
> oracle.jdbc.driver.GeneratedScrollableResultSet.getLong(GeneratedScrollableResultSet.java:254)
>  ~[ojdbc8-19.3.0.0.jar:19.3.0.0.0]
>     at 
> io.debezium.connector.oracle.OracleConnection.lambda$getSessionStatisticByName$10(OracleConnection.java:373)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:642) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at io.debezium.jdbc.JdbcConnection.queryAndMap(JdbcConnection.java:510) 
> ~[debezium-core-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.OracleConnection.getSessionStatisticByName(OracleConnection.java:372)
>  ~[debezium-connector-oracle-1.9.8.Final.jar:1.9.8.Final]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.captureSessionMemoryStatistics(LogMinerStreamingChangeEventSource.java:353)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource.execute(LogMinerStreamingChangeEventSource.java:258)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask$RedoLogSplitReadTask.execute(OracleStreamFetchTask.java:139)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask.executeBackfillTask(OracleScanFetchTask.java:106)
>  ~[flink-connector-oracle-cdc/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:112)
>  ~[flink-cdc-base/:?]
>     at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:99)
>  ~[flink-cdc-base/:?]
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  [?:1.8.0_322]
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  [?:1.8.0_322]
>     at java.lang.Thread.run(Thread.java:750) [?:1.8.0_322] {code}
>  
> {*}reason{*}: 
>  
> This is because after split is read, the reader will be closed, at which 
> point LogMinerStreamingChangeEventSource will perform 
> captureSessionMemoryStatistics to obtain statistical information.
> Finally, in the code
>  
> {code:java}
> public  T queryAndMap(String query, StatementFactory statementFactory, 
> ResultSetMapper mapper) throws SQLException {
> Objects.requireNonNull(mapper, "Mapper must be provided");
> Connection conn = connection();  // Check if the conn is connected
>     try (Statement statement = statementFactory.createStatement(conn);) {
> if (LOGGER.isTraceEnabled()) {
> LOGGER.trace("running '{}'", query);
> }
> try (ResultSet resultSet = statement.executeQuery(query);) {
> //When you get here, split executes the close method to close the 
> connection, and an error will be reported
>             return mapper.apply(resultSet);
> }
> }
> } {code}
>  
> *solve:*
> -1. we can regenerate a connection before calling the 
> *captureSessionMemoryStatistics(connection)* method, but this will be 
> time-consuming. In my local test, it took 6 seconds.-
> 2. Since *cap

[jira] [Created] (FLINK-36649) Oracle When reading via OracleIncrementalSource, the connection is occasionally closed

2024-11-01 Thread Di Wu (Jira)
Di Wu created FLINK-36649:
-

 Summary: Oracle When reading via OracleIncrementalSource, the 
connection is occasionally closed
 Key: FLINK-36649
 URL: https://issues.apache.org/jira/browse/FLINK-36649
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Di Wu






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN

2024-11-01 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36631:
--
Affects Version/s: cdc-3.1.1
   cdc-3.2.0
   cdc-3.1.0
   (was: 3.0.0)

> Supports reading incremental data from Oracle from a specified SCN
> --
>
> Key: FLINK-36631
> URL: https://issues.apache.org/jira/browse/FLINK-36631
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: Di Wu
>Assignee: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
>
> OracleCDC currently only supports initial and latest-offset. Add 
> specific-offset to support reading incremental data from a specified SCN.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36813) MySQLCDC supports synchronization of specified fields

2024-11-27 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36813:
--
Description: 
*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
[https://github.com/apache/flink-cdc/discussions/3058]

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
[https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list]

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
We can refer to debezium 
[RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776),
 The user configures column.include.list, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.

  was:
*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
https://github.com/apache/flink-cdc/discussions/3058

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
The user configures {_}column.include.list{_}, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.


> MySQLCDC supports synchronization of specified fields
> -
>
> Key: FLINK-36813
> URL: https://issues.apache.org/jira/browse/FLINK-36813
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.1
>Reporter: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
> Fix For: cdc-3.3.0
>
>
> *Background*
> In some scenarios, MySQL synchronization only expects to synchronize 
> specified fields instead of all fields in the table.
> 1. The user only has the permission for some fields in MySQL
> 2. The user has too many fields in a single table and only wants to 
> synchronize some fields, for example, here 
> [https://github.com/apache/flink-cdc/discussions/3058]
> *Current situation*
> For the incremental stage, you only need to configure the column.include.list 
> property of debezium to support the synchronization of some fields in the 
> incremental stage, refer to: 
> [https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list]
> For the full snapshot stage, * is currently used in 
> {_}MySqlSnapshotSplitReadTask{_}, refer to
> {code:java}
> if (isScanningData) {
> return buildSelectWithRowLimits(
> tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
> {code}
>  
> *Solution*
> We can refer to debezium 
> [RelationalSnapshotChangeEventSource](https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java#L752-L776),
>  The user configures column.include.list, and then captures the specific 
> columns in MySqlSnapshotSplitReadTask, and splices them when constructing 
> Scan SQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36813) MySQLCDC supports synchronization of specified fields

2024-11-27 Thread Di Wu (Jira)
Di Wu created FLINK-36813:
-

 Summary: MySQLCDC supports synchronization of specified fields
 Key: FLINK-36813
 URL: https://issues.apache.org/jira/browse/FLINK-36813
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Affects Versions: cdc-3.2.1
Reporter: Di Wu
 Fix For: cdc-3.3.0


*Background*
In some scenarios, MySQL synchronization only expects to synchronize specified 
fields instead of all fields in the table.
1. The user only has the permission for some fields in MySQL
2. The user has too many fields in a single table and only wants to synchronize 
some fields, for example, here 
https://github.com/apache/flink-cdc/discussions/3058

*Current situation*
For the incremental stage, you only need to configure the column.include.list 
property of debezium to support the synchronization of some fields in the 
incremental stage, refer to: 
https://debezium.io/documentation/reference/1.9/connectors/mysql.html#mysql-property-column-include-list

For the full snapshot stage, * is currently used in 
{_}MySqlSnapshotSplitReadTask{_}, refer to
{code:java}
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); 
{code}
 

*Solution*
The user configures {_}column.include.list{_}, and then captures the specific 
columns in MySqlSnapshotSplitReadTask, and splices them when constructing Scan 
SQL.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36631) Supports reading incremental data from Oracle from a specified SCN

2025-01-22 Thread Di Wu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Wu updated FLINK-36631:
--
Fix Version/s: cdc-3.4.0

> Supports reading incremental data from Oracle from a specified SCN
> --
>
> Key: FLINK-36631
> URL: https://issues.apache.org/jira/browse/FLINK-36631
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1
>Reporter: Di Wu
>Assignee: Di Wu
>Priority: Major
>  Labels: CDC, pull-request-available
> Fix For: cdc-3.4.0
>
>
> OracleCDC currently only supports initial and latest-offset. Add 
> specific-offset to support reading incremental data from a specified SCN.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)