This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new fa13965020 [INLONG-8436][Sort] Fix the backfill task not running bug in oracle cdc connector (#8437) fa13965020 is described below commit fa139650206bd65cd2b8982d295bb065bd4dc391 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Wed Aug 16 11:25:45 2023 +0800 [INLONG-8436][Sort] Fix the backfill task not running bug in oracle cdc connector (#8437) --- .../source/reader/fetch/OracleScanFetchTask.java | 53 ++++++++++++---------- .../source/reader/fetch/OracleStreamFetchTask.java | 10 ++-- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java index eb7111a8c7..ef12d46842 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -114,17 +114,17 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { snapshotSplitReadTask.execute( changeEventSourceContext, sourceFetchContext.getOffsetContext()); - final StreamSplit backfillBinlogSplit = + final StreamSplit backfillRedoLogSplit = createBackfillRedoLogSplit(changeEventSourceContext); - // optimization that skip the binlog read when the low watermark equals high + // optimization that skip the redo log read when the low watermark equals high // watermark - final boolean binlogBackfillRequired = - backfillBinlogSplit + final boolean redoLogBackfillRequired = + backfillRedoLogSplit .getEndingOffset() - .isAfter(backfillBinlogSplit.getStartingOffset()); - if (!binlogBackfillRequired) { - dispatchBinlogEndEvent( - backfillBinlogSplit, + .isAfter(backfillRedoLogSplit.getStartingOffset()); + if (!redoLogBackfillRequired) { + dispatchRedoLogEndEvent( + backfillRedoLogSplit, ((OracleSourceFetchTaskContext) context).getOffsetContext().getPartition(), ((OracleSourceFetchTaskContext) context).getDispatcher()); taskRunning = false; @@ -132,11 +132,16 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { } // execute redoLog read task if (snapshotResult.isCompletedOrSkipped()) { - final RedoLogSplitReadTask backfillBinlogReadTask = - createBackfillRedoLogReadTask(backfillBinlogSplit, sourceFetchContext); - backfillBinlogReadTask.execute( - new SnapshotBinlogSplitChangeEventSourceContext(), - sourceFetchContext.getOffsetContext()); + final RedoLogSplitReadTask backfillRedoLogReadReTask = + createBackfillRedoLogReadTask(backfillRedoLogSplit, sourceFetchContext); + final LogMinerOracleOffsetContextLoader loader = + new LogMinerOracleOffsetContextLoader( + ((OracleSourceFetchTaskContext) context).getDbzConnectorConfig()); + final OracleOffsetContext oracleOffsetContext = + loader.load(backfillRedoLogSplit.getStartingOffset().getOffset()); + backfillRedoLogReadReTask.execute( + new SnapshotStreamSplitChangeEventSourceContext(), oracleOffsetContext); + taskRunning = false; } else { taskRunning = false; throw new IllegalStateException( @@ -156,13 +161,13 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { } private RedoLogSplitReadTask createBackfillRedoLogReadTask( - StreamSplit backfillBinlogSplit, OracleSourceFetchTaskContext context) { + StreamSplit backfillRedoLogSplit, OracleSourceFetchTaskContext context) { OracleConnectorConfig oracleConnectorConfig = context.getSourceConfig().getDbzConnectorConfig(); final OffsetContext.Loader<OracleOffsetContext> loader = new LogMinerOracleOffsetContextLoader(oracleConnectorConfig); final OracleOffsetContext oracleOffsetContext = - loader.load(backfillBinlogSplit.getStartingOffset().getOffset()); + loader.load(backfillRedoLogSplit.getStartingOffset().getOffset()); // we should only capture events for the current table, // otherwise, we may can't find corresponding schema Configuration dezConf = @@ -173,7 +178,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { // Disable heartbeat event in snapshot split fetcher .with(Heartbeat.HEARTBEAT_INTERVAL, 0) .build(); - // task to read binlog and backfill for current split + // task to read redo log and backfill for current split return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), createOracleConnection(context.getSourceConfig().getDbzConfiguration()), @@ -182,18 +187,18 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { context.getDatabaseSchema(), context.getSourceConfig().getOriginDbzConnectorConfig(), context.getStreamingChangeEventSourceMetrics(), - backfillBinlogSplit); + backfillRedoLogSplit); } - private void dispatchBinlogEndEvent( - StreamSplit backFillBinlogSplit, + private void dispatchRedoLogEndEvent( + StreamSplit backFillRedoLogSplit, Map<String, ?> sourcePartition, JdbcSourceEventDispatcher eventDispatcher) throws InterruptedException { eventDispatcher.dispatchWatermarkEvent( sourcePartition, - backFillBinlogSplit, - backFillBinlogSplit.getEndingOffset(), + backFillRedoLogSplit, + backFillRedoLogSplit.getEndingOffset(), WatermarkKind.END); } @@ -284,7 +289,7 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); - ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); + ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(highWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); return SnapshotResult.completed(ctx.offset); @@ -462,10 +467,10 @@ public class OracleScanFetchTask implements FetchTask<SourceSplitBase> { } /** - * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded binlog task + * The {@link ChangeEventSource.ChangeEventSourceContext} implementation for bounded stream task * of a snapshot split task. */ - public class SnapshotBinlogSplitChangeEventSourceContext + public class SnapshotStreamSplitChangeEventSourceContext implements ChangeEventSource.ChangeEventSourceContext { diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java index 300a007350..e5af201f89 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -87,7 +87,7 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> { } /** - * A wrapped task to read all binlog for table and also supports read bounded (from lowWatermark + * A wrapped task to read all redo log for table and also supports read bounded (from lowWatermark * to highWatermark) binlog. */ public static class RedoLogSplitReadTask extends LogMinerStreamingChangeEventSource { @@ -130,11 +130,11 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> { @Override public void afterHandleScn(OracleOffsetContext offsetContext) { super.afterHandleScn(offsetContext); - // check do we need to stop for fetch binlog for snapshot split. + // check do we need to stop for fetch redo log for snapshot split. if (isBoundedRead()) { final RedoLogOffset currentRedoLogOffset = getCurrentRedoLogOffset(offsetContext.getOffset()); - // reach the high watermark, the binlog fetcher should be finished + // reach the high watermark, the redo log fetcher should be finished if (currentRedoLogOffset.isAtOrAfter(redoLogSplit.getEndingOffset())) { // send binlog end event try { @@ -148,8 +148,8 @@ public class OracleStreamFetchTask implements FetchTask<SourceSplitBase> { errorHandler.setProducerThrowable( new DebeziumException("Error processing binlog signal event", e)); } - // tell fetcher the binlog task finished - ((OracleScanFetchTask.SnapshotBinlogSplitChangeEventSourceContext) context) + // tell fetcher the redo log task finished + ((OracleScanFetchTask.SnapshotStreamSplitChangeEventSourceContext) context) .finished(); } }