This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fa13965020 [INLONG-8436][Sort] Fix the backfill task not running bug 
in oracle cdc connector (#8437)
fa13965020 is described below

commit fa139650206bd65cd2b8982d295bb065bd4dc391
Author: emhui <111486498+e-m...@users.noreply.github.com>
AuthorDate: Wed Aug 16 11:25:45 2023 +0800

    [INLONG-8436][Sort] Fix the backfill task not running bug in oracle cdc 
connector (#8437)
---
 .../source/reader/fetch/OracleScanFetchTask.java   | 53 ++++++++++++----------
 .../source/reader/fetch/OracleStreamFetchTask.java | 10 ++--
 2 files changed, 34 insertions(+), 29 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
index eb7111a8c7..ef12d46842 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java
@@ -114,17 +114,17 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
                 snapshotSplitReadTask.execute(
                         changeEventSourceContext, 
sourceFetchContext.getOffsetContext());
 
-        final StreamSplit backfillBinlogSplit =
+        final StreamSplit backfillRedoLogSplit =
                 createBackfillRedoLogSplit(changeEventSourceContext);
-        // optimization that skip the binlog read when the low watermark 
equals high
+        // optimization that skip the redo log read when the low watermark 
equals high
         // watermark
-        final boolean binlogBackfillRequired =
-                backfillBinlogSplit
+        final boolean redoLogBackfillRequired =
+                backfillRedoLogSplit
                         .getEndingOffset()
-                        .isAfter(backfillBinlogSplit.getStartingOffset());
-        if (!binlogBackfillRequired) {
-            dispatchBinlogEndEvent(
-                    backfillBinlogSplit,
+                        .isAfter(backfillRedoLogSplit.getStartingOffset());
+        if (!redoLogBackfillRequired) {
+            dispatchRedoLogEndEvent(
+                    backfillRedoLogSplit,
                     ((OracleSourceFetchTaskContext) 
context).getOffsetContext().getPartition(),
                     ((OracleSourceFetchTaskContext) context).getDispatcher());
             taskRunning = false;
@@ -132,11 +132,16 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
         }
         // execute redoLog read task
         if (snapshotResult.isCompletedOrSkipped()) {
-            final RedoLogSplitReadTask backfillBinlogReadTask =
-                    createBackfillRedoLogReadTask(backfillBinlogSplit, 
sourceFetchContext);
-            backfillBinlogReadTask.execute(
-                    new SnapshotBinlogSplitChangeEventSourceContext(),
-                    sourceFetchContext.getOffsetContext());
+            final RedoLogSplitReadTask backfillRedoLogReadReTask =
+                    createBackfillRedoLogReadTask(backfillRedoLogSplit, 
sourceFetchContext);
+            final LogMinerOracleOffsetContextLoader loader =
+                    new LogMinerOracleOffsetContextLoader(
+                            ((OracleSourceFetchTaskContext) 
context).getDbzConnectorConfig());
+            final OracleOffsetContext oracleOffsetContext =
+                    
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
+            backfillRedoLogReadReTask.execute(
+                    new SnapshotStreamSplitChangeEventSourceContext(), 
oracleOffsetContext);
+            taskRunning = false;
         } else {
             taskRunning = false;
             throw new IllegalStateException(
@@ -156,13 +161,13 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
     }
 
     private RedoLogSplitReadTask createBackfillRedoLogReadTask(
-            StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext 
context) {
+            StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext 
context) {
         OracleConnectorConfig oracleConnectorConfig =
                 context.getSourceConfig().getDbzConnectorConfig();
         final OffsetContext.Loader<OracleOffsetContext> loader =
                 new LogMinerOracleOffsetContextLoader(oracleConnectorConfig);
         final OracleOffsetContext oracleOffsetContext =
-                
loader.load(backfillBinlogSplit.getStartingOffset().getOffset());
+                
loader.load(backfillRedoLogSplit.getStartingOffset().getOffset());
         // we should only capture events for the current table,
         // otherwise, we may can't find corresponding schema
         Configuration dezConf =
@@ -173,7 +178,7 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
                         // Disable heartbeat event in snapshot split fetcher
                         .with(Heartbeat.HEARTBEAT_INTERVAL, 0)
                         .build();
-        // task to read binlog and backfill for current split
+        // task to read redo log and backfill for current split
         return new RedoLogSplitReadTask(
                 new OracleConnectorConfig(dezConf),
                 
createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
@@ -182,18 +187,18 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
                 context.getDatabaseSchema(),
                 context.getSourceConfig().getOriginDbzConnectorConfig(),
                 context.getStreamingChangeEventSourceMetrics(),
-                backfillBinlogSplit);
+                backfillRedoLogSplit);
     }
 
-    private void dispatchBinlogEndEvent(
-            StreamSplit backFillBinlogSplit,
+    private void dispatchRedoLogEndEvent(
+            StreamSplit backFillRedoLogSplit,
             Map<String, ?> sourcePartition,
             JdbcSourceEventDispatcher eventDispatcher)
             throws InterruptedException {
         eventDispatcher.dispatchWatermarkEvent(
                 sourcePartition,
-                backFillBinlogSplit,
-                backFillBinlogSplit.getEndingOffset(),
+                backFillRedoLogSplit,
+                backFillRedoLogSplit.getEndingOffset(),
                 WatermarkKind.END);
     }
 
@@ -284,7 +289,7 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
                     "Snapshot step 3 - Determining high watermark {} for split 
{}",
                     highWatermark,
                     snapshotSplit);
-            ((SnapshotSplitChangeEventSourceContext) 
(context)).setHighWatermark(lowWatermark);
+            ((SnapshotSplitChangeEventSourceContext) 
(context)).setHighWatermark(highWatermark);
             dispatcher.dispatchWatermarkEvent(
                     offsetContext.getPartition(), snapshotSplit, 
highWatermark, WatermarkKind.HIGH);
             return SnapshotResult.completed(ctx.offset);
@@ -462,10 +467,10 @@ public class OracleScanFetchTask implements 
FetchTask<SourceSplitBase> {
     }
 
     /**
-     * The {@link ChangeEventSource.ChangeEventSourceContext} implementation 
for bounded binlog task
+     * The {@link ChangeEventSource.ChangeEventSourceContext} implementation 
for bounded stream task
      * of a snapshot split task.
      */
-    public class SnapshotBinlogSplitChangeEventSourceContext
+    public class SnapshotStreamSplitChangeEventSourceContext
             implements
                 ChangeEventSource.ChangeEventSourceContext {
 
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
index 300a007350..e5af201f89 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java
@@ -87,7 +87,7 @@ public class OracleStreamFetchTask implements 
FetchTask<SourceSplitBase> {
     }
 
     /**
-     * A wrapped task to read all binlog for table and also supports read 
bounded (from lowWatermark
+     * A wrapped task to read all redo log for table and also supports read 
bounded (from lowWatermark
      * to highWatermark) binlog.
      */
     public static class RedoLogSplitReadTask extends 
LogMinerStreamingChangeEventSource {
@@ -130,11 +130,11 @@ public class OracleStreamFetchTask implements 
FetchTask<SourceSplitBase> {
         @Override
         public void afterHandleScn(OracleOffsetContext offsetContext) {
             super.afterHandleScn(offsetContext);
-            // check do we need to stop for fetch binlog for snapshot split.
+            // check do we need to stop for fetch redo log for snapshot split.
             if (isBoundedRead()) {
                 final RedoLogOffset currentRedoLogOffset =
                         getCurrentRedoLogOffset(offsetContext.getOffset());
-                // reach the high watermark, the binlog fetcher should be 
finished
+                // reach the high watermark, the redo log fetcher should be 
finished
                 if 
(currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) {
                     // send binlog end event
                     try {
@@ -148,8 +148,8 @@ public class OracleStreamFetchTask implements 
FetchTask<SourceSplitBase> {
                         errorHandler.setProducerThrowable(
                                 new DebeziumException("Error processing binlog 
signal event", e));
                     }
-                    // tell fetcher the binlog task finished
-                    
((OracleScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context)
+                    // tell fetcher the redo log task finished
+                    
((OracleScanFetchTask.SnapshotStreamSplitChangeEventSourceContext) context)
                             .finished();
                 }
             }

Reply via email to