This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2f1e6d0e36 [INLONG-10575][Sort] Make mysql source support report audit information exactly once (#10576) 2f1e6d0e36 is described below commit 2f1e6d0e36a2968b062e22738ce0df95b532977a Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Tue Jul 9 10:17:48 2024 +0800 [INLONG-10575][Sort] Make mysql source support report audit information exactly once (#10576) --- .../mysql/RowDataDebeziumDeserializeSchema.java | 35 +- .../inlong/sort/mysql/source/MySqlSource.java | 5 +- .../mysql/source/reader/MySqlSourceReader.java | 383 +++++++++++++++++++++ licenses/inlong-sort-connectors/LICENSE | 2 +- 4 files changed, 411 insertions(+), 14 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java index 85e21fe228..0a1f2013f2 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java @@ -19,7 +19,7 @@ package org.apache.inlong.sort.mysql; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.SourceExactlyMetric; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.AppendMetadataCollector; @@ -53,8 +53,6 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Serializable; import java.math.BigDecimal; @@ -74,8 +72,6 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema<RowData> { - private final static Logger LOG = LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class); - private static final long serialVersionUID = 2L; /** Custom validator to validate the row value. */ @@ -106,7 +102,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private SourceMetricData sourceMetricData; + private SourceExactlyMetric sourceExactlyMetric; private final MetricOption metricOption; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ @@ -145,7 +141,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali GenericRowData insert = extractAfterRow(value, valueSchema); validator.validate(insert, RowKind.INSERT); insert.setRowKind(RowKind.INSERT); - if (sourceMetricData != null) { + if (sourceExactlyMetric != null) { out = createMetricsCollector(record, out); } emit(record, insert, out); @@ -153,7 +149,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali GenericRowData delete = extractBeforeRow(value, valueSchema); validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); - if (sourceMetricData != null) { + if (sourceExactlyMetric != null) { out = createMetricsCollector(record, out); } emit(record, delete, out); @@ -168,7 +164,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali GenericRowData after = extractAfterRow(value, valueSchema); validator.validate(after, RowKind.UPDATE_AFTER); after.setRowKind(RowKind.UPDATE_AFTER); - if (sourceMetricData != null) { + if (sourceExactlyMetric != null) { out = createMetricsCollector(record, out); } emit(record, after, out); @@ -182,7 +178,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali * @return metrics collector */ private Collector<RowData> createMetricsCollector(SourceRecord record, Collector<RowData> out) { - MetricsCollector<RowData> collector = new MetricsCollector<>(out, sourceMetricData); + MetricsCollector<RowData> collector = new MetricsCollector<>(out, sourceExactlyMetric); collector.resetTimestamp((Long) ((Struct) record.value()).get(FieldName.TIMESTAMP)); return collector; } @@ -194,7 +190,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali */ public void initSourceMetricData() { if (metricOption != null) { - this.sourceMetricData = new SourceMetricData(metricOption); + this.sourceExactlyMetric = new SourceExactlyMetric(metricOption); } } @@ -225,6 +221,23 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali return resultTypeInfo; } + public void flushAudit() { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.flushAudit(); + } + } + + public void updateCurrentCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateCurrentCheckpointId(checkpointId); + } + } + + public void updateLastCheckpointId(long checkpointId) { + if (sourceExactlyMetric != null) { + sourceExactlyMetric.updateLastCheckpointId(checkpointId); + } + } // ------------------------------------------------------------------------------------- // Builder // ------------------------------------------------------------------------------------- diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java index 801d172372..da1dcbadae 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.mysql.source; import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema; +import org.apache.inlong.sort.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.MySqlValidator; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; @@ -33,7 +34,6 @@ import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory import com.ververica.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator; import com.ververica.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics; import com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter; -import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext; import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; @@ -167,7 +167,8 @@ public class MySqlSource<T> sourceConfig.isIncludeSchemaChanges()), readerContext.getConfiguration(), mySqlSourceReaderContext, - sourceConfig); + sourceConfig, + deserializationSchema); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java new file mode 100644 index 0000000000..01f34f28b1 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql.source.reader; + +import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema; + +import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; +import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig; +import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaEvent; +import com.ververica.cdc.connectors.mysql.source.events.BinlogSplitMetaRequestEvent; +import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsAckEvent; +import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsReportEvent; +import com.ververica.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent; +import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeEvent; +import com.ververica.cdc.connectors.mysql.source.events.LatestFinishedSplitsSizeRequestEvent; +import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderAckEvent; +import com.ververica.cdc.connectors.mysql.source.events.SuspendBinlogReaderEvent; +import com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent; +import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReaderContext; +import com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader; +import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; +import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplitState; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSnapshotSplitState; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit; +import com.ververica.cdc.connectors.mysql.source.split.MySqlSplitState; +import com.ververica.cdc.connectors.mysql.source.split.SourceRecords; +import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import io.debezium.connector.mysql.MySqlConnection; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static com.ververica.cdc.connectors.mysql.source.events.WakeupReaderEvent.WakeUpTarget.SNAPSHOT_READER; +import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toNormalBinlogSplit; +import static com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit.toSuspendedBinlogSplit; +import static com.ververica.cdc.connectors.mysql.source.utils.ChunkUtils.getNextMetaGroupId; + +/** The source reader for MySQL source splits. + * copy from {@link com.ververica.cdc.connectors.mysql.source.reader.MySqlSourceReader} + * */ +public class MySqlSourceReader<T> + extends + SingleThreadMultiplexSourceReaderBase<SourceRecords, T, MySqlSplit, MySqlSplitState> { + + private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); + + private final MySqlSourceConfig sourceConfig; + private final Map<String, MySqlSnapshotSplit> finishedUnackedSplits; + private final Map<String, MySqlBinlogSplit> uncompletedBinlogSplits; + private final int subtaskId; + private final MySqlSourceReaderContext mySqlSourceReaderContext; + private MySqlBinlogSplit suspendedBinlogSplit; + private final DebeziumDeserializationSchema<T> metricSchema; + + public MySqlSourceReader( + FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementQueue, + Supplier<MySqlSplitReader> splitReaderSupplier, + RecordEmitter<SourceRecords, T, MySqlSplitState> recordEmitter, + Configuration config, + MySqlSourceReaderContext context, + MySqlSourceConfig sourceConfig, DebeziumDeserializationSchema<T> metricSchema) { + super( + elementQueue, + new SingleThreadFetcherManager<>(elementQueue, splitReaderSupplier::get), + recordEmitter, + config, + context.getSourceReaderContext()); + this.sourceConfig = sourceConfig; + this.finishedUnackedSplits = new HashMap<>(); + this.uncompletedBinlogSplits = new HashMap<>(); + this.subtaskId = context.getSourceReaderContext().getIndexOfSubtask(); + this.mySqlSourceReaderContext = context; + this.suspendedBinlogSplit = null; + this.metricSchema = metricSchema; + } + + @Override + public void start() { + if (getNumberOfCurrentlyAssignedSplits() == 0) { + context.sendSplitRequest(); + } + } + + @Override + protected MySqlSplitState initializedState(MySqlSplit split) { + if (split.isSnapshotSplit()) { + return new MySqlSnapshotSplitState(split.asSnapshotSplit()); + } else { + return new MySqlBinlogSplitState(split.asBinlogSplit()); + } + } + + @Override + public List<MySqlSplit> snapshotState(long checkpointId) { + if (metricSchema instanceof RowDataDebeziumDeserializeSchema) { + ((RowDataDebeziumDeserializeSchema) metricSchema).updateCurrentCheckpointId(checkpointId); + } + List<MySqlSplit> stateSplits = super.snapshotState(checkpointId); + + // unfinished splits + List<MySqlSplit> unfinishedSplits = + stateSplits.stream() + .filter(split -> !finishedUnackedSplits.containsKey(split.splitId())) + .collect(Collectors.toList()); + + // add finished snapshot splits that didn't receive ack yet + unfinishedSplits.addAll(finishedUnackedSplits.values()); + + // add binlog splits who are uncompleted + unfinishedSplits.addAll(uncompletedBinlogSplits.values()); + + // add suspended BinlogSplit + if (suspendedBinlogSplit != null) { + unfinishedSplits.add(suspendedBinlogSplit); + } + + logCurrentBinlogOffsets(unfinishedSplits, checkpointId); + + return unfinishedSplits; + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + super.notifyCheckpointComplete(checkpointId); + if (metricSchema instanceof RowDataDebeziumDeserializeSchema) { + RowDataDebeziumDeserializeSchema schema = (RowDataDebeziumDeserializeSchema) metricSchema; + schema.flushAudit(); + schema.updateLastCheckpointId(checkpointId); + } + } + + @Override + protected void onSplitFinished(Map<String, MySqlSplitState> finishedSplitIds) { + boolean requestNextSplit = true; + for (MySqlSplitState mySqlSplitState : finishedSplitIds.values()) { + MySqlSplit mySqlSplit = mySqlSplitState.toMySqlSplit(); + if (mySqlSplit.isBinlogSplit()) { + LOG.info( + "binlog split reader suspended due to newly added table, offset {}", + mySqlSplitState.asBinlogSplitState().getStartingOffset()); + + mySqlSourceReaderContext.resetStopBinlogSplitReader(); + suspendedBinlogSplit = toSuspendedBinlogSplit(mySqlSplit.asBinlogSplit()); + context.sendSourceEventToCoordinator(new SuspendBinlogReaderAckEvent()); + // do not request next split when the reader is suspended, the suspended reader will + // automatically request the next split after it has been wakeup + requestNextSplit = false; + } else { + finishedUnackedSplits.put(mySqlSplit.splitId(), mySqlSplit.asSnapshotSplit()); + } + } + reportFinishedSnapshotSplitsIfNeed(); + if (requestNextSplit) { + context.sendSplitRequest(); + } + } + + @Override + public void addSplits(List<MySqlSplit> splits) { + // restore for finishedUnackedSplits + List<MySqlSplit> unfinishedSplits = new ArrayList<>(); + for (MySqlSplit split : splits) { + LOG.info("Add Split: " + split); + if (split.isSnapshotSplit()) { + MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit(); + if (snapshotSplit.isSnapshotReadFinished()) { + finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit); + } else { + unfinishedSplits.add(split); + } + } else { + MySqlBinlogSplit binlogSplit = split.asBinlogSplit(); + // the binlog split is suspended + if (binlogSplit.isSuspended()) { + suspendedBinlogSplit = binlogSplit; + } else if (!binlogSplit.isCompletedSplit()) { + uncompletedBinlogSplits.put(split.splitId(), split.asBinlogSplit()); + requestBinlogSplitMetaIfNeeded(split.asBinlogSplit()); + } else { + uncompletedBinlogSplits.remove(split.splitId()); + MySqlBinlogSplit mySqlBinlogSplit = + discoverTableSchemasForBinlogSplit(split.asBinlogSplit()); + unfinishedSplits.add(mySqlBinlogSplit); + } + } + } + // notify split enumerator again about the finished unacked snapshot splits + reportFinishedSnapshotSplitsIfNeed(); + // add all un-finished splits (including binlog split) to SourceReaderBase + if (!unfinishedSplits.isEmpty()) { + super.addSplits(unfinishedSplits); + } + } + + private MySqlBinlogSplit discoverTableSchemasForBinlogSplit(MySqlBinlogSplit split) { + final String splitId = split.splitId(); + if (split.getTableSchemas().isEmpty()) { + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { + Map<TableId, TableChanges.TableChange> tableSchemas = + TableDiscoveryUtils.discoverCapturedTableSchemas(sourceConfig, jdbc); + LOG.info("The table schema discovery for binlog split {} success", splitId); + return MySqlBinlogSplit.fillTableSchemas(split, tableSchemas); + } catch (SQLException e) { + LOG.error("Failed to obtains table schemas due to {}", e.getMessage()); + throw new FlinkRuntimeException(e); + } + } else { + LOG.warn( + "The binlog split {} has table schemas yet, skip the table schema discovery", + split); + return split; + } + } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) { + FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent; + LOG.debug( + "The subtask {} receives ack event for {} from enumerator.", + subtaskId, + ackEvent.getFinishedSplits()); + for (String splitId : ackEvent.getFinishedSplits()) { + this.finishedUnackedSplits.remove(splitId); + } + } else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) { + // report finished snapshot splits + LOG.debug( + "The subtask {} receives request to report finished snapshot splits.", + subtaskId); + reportFinishedSnapshotSplitsIfNeed(); + } else if (sourceEvent instanceof BinlogSplitMetaEvent) { + LOG.debug( + "The subtask {} receives binlog meta with group id {}.", + subtaskId, + ((BinlogSplitMetaEvent) sourceEvent).getMetaGroupId()); + fillMetaDataForBinlogSplit((BinlogSplitMetaEvent) sourceEvent); + } else if (sourceEvent instanceof SuspendBinlogReaderEvent) { + mySqlSourceReaderContext.setStopBinlogSplitReader(); + } else if (sourceEvent instanceof WakeupReaderEvent) { + WakeupReaderEvent wakeupReaderEvent = (WakeupReaderEvent) sourceEvent; + if (wakeupReaderEvent.getTarget() == SNAPSHOT_READER) { + context.sendSplitRequest(); + } else { + if (suspendedBinlogSplit != null) { + context.sendSourceEventToCoordinator( + new LatestFinishedSplitsSizeRequestEvent()); + } + } + } else if (sourceEvent instanceof LatestFinishedSplitsSizeEvent) { + if (suspendedBinlogSplit != null) { + final int finishedSplitsSize = + ((LatestFinishedSplitsSizeEvent) sourceEvent).getLatestFinishedSplitsSize(); + final MySqlBinlogSplit binlogSplit = + toNormalBinlogSplit(suspendedBinlogSplit, finishedSplitsSize); + suspendedBinlogSplit = null; + this.addSplits(Collections.singletonList(binlogSplit)); + } + } else { + super.handleSourceEvents(sourceEvent); + } + } + + private void reportFinishedSnapshotSplitsIfNeed() { + if (!finishedUnackedSplits.isEmpty()) { + final Map<String, BinlogOffset> finishedOffsets = new HashMap<>(); + for (MySqlSnapshotSplit split : finishedUnackedSplits.values()) { + finishedOffsets.put(split.splitId(), split.getHighWatermark()); + } + FinishedSnapshotSplitsReportEvent reportEvent = + new FinishedSnapshotSplitsReportEvent(finishedOffsets); + context.sendSourceEventToCoordinator(reportEvent); + LOG.debug( + "The subtask {} reports offsets of finished snapshot splits {}.", + subtaskId, + finishedOffsets); + } + } + + private void requestBinlogSplitMetaIfNeeded(MySqlBinlogSplit binlogSplit) { + final String splitId = binlogSplit.splitId(); + if (!binlogSplit.isCompletedSplit()) { + final int nextMetaGroupId = + getNextMetaGroupId( + binlogSplit.getFinishedSnapshotSplitInfos().size(), + sourceConfig.getSplitMetaGroupSize()); + BinlogSplitMetaRequestEvent splitMetaRequestEvent = + new BinlogSplitMetaRequestEvent(splitId, nextMetaGroupId); + context.sendSourceEventToCoordinator(splitMetaRequestEvent); + } else { + LOG.info("The meta of binlog split {} has been collected success", splitId); + this.addSplits(Collections.singletonList(binlogSplit)); + } + } + + private void fillMetaDataForBinlogSplit(BinlogSplitMetaEvent metadataEvent) { + MySqlBinlogSplit binlogSplit = uncompletedBinlogSplits.get(metadataEvent.getSplitId()); + if (binlogSplit != null) { + final int receivedMetaGroupId = metadataEvent.getMetaGroupId(); + final int expectedMetaGroupId = + getNextMetaGroupId( + binlogSplit.getFinishedSnapshotSplitInfos().size(), + sourceConfig.getSplitMetaGroupSize()); + if (receivedMetaGroupId == expectedMetaGroupId) { + List<FinishedSnapshotSplitInfo> metaDataGroup = + metadataEvent.getMetaGroup().stream() + .map(FinishedSnapshotSplitInfo::deserialize) + .collect(Collectors.toList()); + uncompletedBinlogSplits.put( + binlogSplit.splitId(), + MySqlBinlogSplit.appendFinishedSplitInfos(binlogSplit, metaDataGroup)); + + LOG.info("Fill meta data of group {} to binlog split", metaDataGroup.size()); + } else { + LOG.warn( + "Received out of oder binlog meta event for split {}, the received meta group id is {}, but expected is {}, ignore it", + metadataEvent.getSplitId(), + receivedMetaGroupId, + expectedMetaGroupId); + } + requestBinlogSplitMetaIfNeeded(uncompletedBinlogSplits.get(binlogSplit.splitId())); + } else { + LOG.warn( + "Received binlog meta event for split {}, but the uncompleted split map does not contain it", + metadataEvent.getSplitId()); + } + } + + private void logCurrentBinlogOffsets(List<MySqlSplit> splits, long checkpointId) { + if (!LOG.isInfoEnabled()) { + return; + } + for (MySqlSplit split : splits) { + if (!split.isBinlogSplit()) { + return; + } + BinlogOffset offset = split.asBinlogSplit().getStartingOffset(); + LOG.info("Binlog offset on checkpoint {}: {}", checkpointId, offset); + } + } + + @Override + protected MySqlSplit toSplitType(String splitId, MySqlSplitState splitState) { + return splitState.toMySqlSplit(); + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index e4c4590fd4..b7c89b9245 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -920,7 +920,7 @@ License : https://github.com/apache/hudi/blob/master/LICENSE inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSourceBuilder.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/MySqlSource.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/table/MySqlReadableMetadata.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE