This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 0c3044e3f6 [Hotfix][CDC] Fix split schema change stream (#7003) 0c3044e3f6 is described below commit 0c3044e3f6d5d1a52df1896e069676b509419e4b Author: hailin0 <wanghai...@apache.org> AuthorDate: Tue Jun 18 14:29:33 2024 +0800 [Hotfix][CDC] Fix split schema change stream (#7003) --- .../external/IncrementalSourceStreamFetcher.java | 157 ++++++++++++--------- .../IncrementalSourceStreamFetcherTest.java | 23 ++- 2 files changed, 115 insertions(+), 65 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java index 4cad739ac6..338cb657b3 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcher.java @@ -171,69 +171,7 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So * checkpoint-after] [a, b, c, d, e] */ Iterator<SourceRecords> splitSchemaChangeStream(List<DataChangeEvent> batchEvents) { - List<SourceRecords> sourceRecordsSet = new ArrayList<>(); - - List<SourceRecord> sourceRecordList = new ArrayList<>(); - SourceRecord previousRecord = null; - for (int i = 0; i < batchEvents.size(); i++) { - DataChangeEvent event = batchEvents.get(i); - SourceRecord currentRecord = event.getRecord(); - if (!shouldEmit(currentRecord)) { - continue; - } - - if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { - if (!schemaChangeResolver.support(currentRecord)) { - continue; - } - - if (previousRecord == null) { - // add schema-change-before to first - sourceRecordList.add( - WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) { - sourceRecordList.add(currentRecord); - } else { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } - } else if (SourceRecordUtils.isDataChangeRecord(currentRecord) - || SourceRecordUtils.isHeartbeatRecord(currentRecord)) { - if (previousRecord == null - || SourceRecordUtils.isDataChangeRecord(previousRecord) - || SourceRecordUtils.isHeartbeatRecord(previousRecord)) { - sourceRecordList.add(currentRecord); - } else { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - sourceRecordList = new ArrayList<>(); - sourceRecordList.add(currentRecord); - } - } - previousRecord = currentRecord; - if (i == batchEvents.size() - 1) { - if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { - sourceRecordList.add( - WatermarkEvent.createSchemaChangeAfterWatermark(currentRecord)); - } - sourceRecordsSet.add(new SourceRecords(sourceRecordList)); - } - } - - if (sourceRecordsSet.size() > 1) { - log.debug( - "Split events stream into {} batches and mark schema checkpoint before/after", - sourceRecordsSet.size()); - } - - return sourceRecordsSet.iterator(); + return new SchemaChangeStreamSplitter().split(batchEvents); } private void checkReadException() { @@ -349,4 +287,97 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap; this.pureBinlogPhaseTables.clear(); } + + class SchemaChangeStreamSplitter { + private List<SourceRecords> blockSet; + private List<SourceRecord> currentBlock; + private SourceRecord previousRecord; + + public SchemaChangeStreamSplitter() { + blockSet = new ArrayList<>(); + currentBlock = new ArrayList<>(); + previousRecord = null; + } + + public Iterator<SourceRecords> split(List<DataChangeEvent> batchEvents) { + for (int i = 0; i < batchEvents.size(); i++) { + DataChangeEvent event = batchEvents.get(i); + SourceRecord currentRecord = event.getRecord(); + if (!shouldEmit(currentRecord)) { + continue; + } + + if (SourceRecordUtils.isSchemaChangeEvent(currentRecord)) { + if (!schemaChangeResolver.support(currentRecord)) { + continue; + } + + if (previousRecord == null) { + // add schema-change-before to first + currentBlock.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + flipBlock(); + + currentBlock.add(currentRecord); + } else if (SourceRecordUtils.isSchemaChangeEvent(previousRecord)) { + currentBlock.add(currentRecord); + } else { + currentBlock.add( + WatermarkEvent.createSchemaChangeBeforeWatermark(currentRecord)); + flipBlock(); + + currentBlock.add(currentRecord); + } + } else if (SourceRecordUtils.isDataChangeRecord(currentRecord) + || SourceRecordUtils.isHeartbeatRecord(currentRecord)) { + if (previousRecord == null + || SourceRecordUtils.isDataChangeRecord(previousRecord) + || SourceRecordUtils.isHeartbeatRecord(previousRecord)) { + currentBlock.add(currentRecord); + } else { + endBlock(previousRecord); + flipBlock(); + + currentBlock.add(currentRecord); + } + } + + previousRecord = currentRecord; + if (i == batchEvents.size() - 1) { + endBlock(currentRecord); + flipBlock(); + } + } + + endLastBlock(previousRecord); + + if (blockSet.size() > 1) { + log.debug( + "Split events stream into {} batches and mark schema change checkpoint", + blockSet.size()); + } + + return blockSet.iterator(); + } + + void flipBlock() { + if (!currentBlock.isEmpty()) { + blockSet.add(new SourceRecords(currentBlock)); + currentBlock = new ArrayList<>(); + } + } + + void endBlock(SourceRecord lastRecord) { + if (!currentBlock.isEmpty()) { + if (SourceRecordUtils.isSchemaChangeEvent(lastRecord)) { + currentBlock.add(WatermarkEvent.createSchemaChangeAfterWatermark(lastRecord)); + } + } + } + + void endLastBlock(SourceRecord lastRecord) { + endBlock(lastRecord); + flipBlock(); + } + } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java index a17f7f8673..ee8d4d7e5d 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/reader/external/IncrementalSourceStreamFetcherTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.source.SourceRecord; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.mockito.stubbing.Answer; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; @@ -61,6 +62,7 @@ public class IncrementalSourceStreamFetcherTest { .with(Heartbeat.HEARTBEAT_INTERVAL, 1) .with(TRANSACTION_TOPIC, "test") .build(); + private static final String UNKNOWN_SCHEMA_KEY = "UNKNOWN"; @Test public void testSplitSchemaChangeStream() throws Exception { @@ -107,6 +109,7 @@ public class IncrementalSourceStreamFetcherTest { inputEvents.add(new DataChangeEvent(createDataEvent())); inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); + inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent())); outputEvents = fetcher.splitSchemaChangeStream(inputEvents); outputEvents.forEachRemaining(records::add); @@ -134,6 +137,7 @@ public class IncrementalSourceStreamFetcherTest { inputEvents.add(new DataChangeEvent(createSchemaChangeEvent())); inputEvents.add(new DataChangeEvent(createDataEvent())); inputEvents.add(new DataChangeEvent(createDataEvent())); + inputEvents.add(new DataChangeEvent(createSchemaChangeUnknownEvent())); outputEvents = fetcher.splitSchemaChangeStream(inputEvents); outputEvents.forEachRemaining(records::add); @@ -323,13 +327,21 @@ public class IncrementalSourceStreamFetcherTest { } static SourceRecord createSchemaChangeEvent() { + return createSchemaChangeEvent("SCHEMA_CHANGE_TOPIC"); + } + + static SourceRecord createSchemaChangeUnknownEvent() { + return createSchemaChangeEvent(UNKNOWN_SCHEMA_KEY); + } + + static SourceRecord createSchemaChangeEvent(String topic) { Schema keySchema = SchemaBuilder.struct().name(SourceRecordUtils.SCHEMA_CHANGE_EVENT_KEY_NAME).build(); SourceRecord record = new SourceRecord( Collections.emptyMap(), Collections.emptyMap(), - null, + topic, keySchema, null, null, @@ -377,7 +389,14 @@ public class IncrementalSourceStreamFetcherTest { static IncrementalSourceStreamFetcher createFetcher() { SchemaChangeResolver schemaChangeResolver = mock(SchemaChangeResolver.class); - when(schemaChangeResolver.support(any())).thenReturn(true); + when(schemaChangeResolver.support(any())) + .thenAnswer( + (Answer<Boolean>) + invocationOnMock -> { + SourceRecord record = invocationOnMock.getArgument(0); + return record.topic() == null + || !record.topic().equalsIgnoreCase(UNKNOWN_SCHEMA_KEY); + }); IncrementalSourceStreamFetcher fetcher = new IncrementalSourceStreamFetcher(null, 0, schemaChangeResolver); IncrementalSourceStreamFetcher spy = spy(fetcher);