This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 24beb99161 [INLONG-10340][Sort] Fix MongoDB AuditOperator not serialized (#10343) 24beb99161 is described below commit 24beb99161cd5fce8d74d5f3bef104896f93cc8f Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed Jun 5 11:37:35 2024 +0800 [INLONG-10340][Sort] Fix MongoDB AuditOperator not serialized (#10343) --- .../inlong/sort/mongodb/DebeziumChangeFetcher.java | 322 ++++++++++++ .../mongodb/DebeziumDeserializationSchema.java | 42 ++ .../sort/mongodb/DebeziumSourceFunction.java | 563 +++++++++++++++++++++ .../MongoDBConnectorDeserializationSchema.java | 12 +- .../apache/inlong/sort/mongodb/MongoDBSource.java | 344 +++++++++++++ .../inlong/sort/mongodb/MongoDBTableSource.java | 9 +- .../sort/mongodb/source/IncrementalSource.java | 217 ++++++++ .../source/IncrementalSourceRecordEmitter.java | 176 +++++++ .../sort/mongodb/source/MongoDBRecordEmitter.java | 111 ++++ .../inlong/sort/mongodb/source/MongoDBSource.java | 95 ++++ .../sort/mongodb/source/MongoDBSourceBuilder.java | 201 ++++++++ licenses/inlong-sort-connectors/LICENSE | 14 +- 12 files changed, 2096 insertions(+), 10 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java new file mode 100644 index 0000000000..b5d06500fe --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb; + +import com.ververica.cdc.debezium.internal.DebeziumOffset; +import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.ververica.cdc.debezium.internal.Handover; +import io.debezium.connector.SnapshotRecord; +import io.debezium.data.Envelope; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +/** + * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering + * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler + * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the + * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when + * emitting the records. + * + * @param <T> The type of elements produced by the handler. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@Internal +public class DebeziumChangeFetcher<T> { + + private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class); + + private final SourceFunction.SourceContext<T> sourceContext; + + /** + * The lock that guarantees that record emission and state updates are atomic, from the view of + * taking a checkpoint. + */ + private final Object checkpointLock; + + /** The schema to convert from Debezium's messages into Flink's objects. */ + private final DebeziumDeserializationSchema<T> deserialization; + + /** A collector to emit records in batch (bundle). */ + private final DebeziumCollector debeziumCollector; + + private final DebeziumOffset debeziumOffset; + + private final DebeziumOffsetSerializer stateSerializer; + + private final String heartbeatTopicPrefix; + + private boolean isInDbSnapshotPhase; + + private final Handover handover; + + private volatile boolean isRunning = true; + + // --------------------------------------------------------------------------------------- + // Metrics + // --------------------------------------------------------------------------------------- + + /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */ + private volatile long messageTimestamp = 0L; + + /** The last record processing time. */ + private volatile long processTime = 0L; + + /** + * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the + * record fetched into the source operator. + */ + private volatile long fetchDelay = 0L; + + /** + * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the + * source operator. + */ + private volatile long emitDelay = 0L; + + // ------------------------------------------------------------------------ + + public DebeziumChangeFetcher( + SourceFunction.SourceContext<T> sourceContext, + DebeziumDeserializationSchema<T> deserialization, + boolean isInDbSnapshotPhase, + String heartbeatTopicPrefix, + Handover handover) { + this.sourceContext = sourceContext; + this.checkpointLock = sourceContext.getCheckpointLock(); + this.deserialization = deserialization; + this.isInDbSnapshotPhase = isInDbSnapshotPhase; + this.heartbeatTopicPrefix = heartbeatTopicPrefix; + this.debeziumCollector = new DebeziumCollector(); + this.debeziumOffset = new DebeziumOffset(); + this.stateSerializer = DebeziumOffsetSerializer.INSTANCE; + this.handover = handover; + } + + /** + * Take a snapshot of the Debezium handler state. + * + * <p>Important: This method must be called under the checkpoint lock. + */ + public byte[] snapshotCurrentState() throws Exception { + // this method assumes that the checkpoint lock is held + assert Thread.holdsLock(checkpointLock); + if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) { + return null; + } + + return stateSerializer.serialize(debeziumOffset); + } + + /** + * Process change messages from the {@link Handover} and collect the processed messages by + * {@link Collector}. + */ + public void runFetchLoop() throws Exception { + try { + // begin snapshot database phase + if (isInDbSnapshotPhase) { + List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext(); + + synchronized (checkpointLock) { + LOG.info( + "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock."); + handleBatch(events); + while (isRunning && isInDbSnapshotPhase) { + handleBatch(handover.pollNext()); + } + } + LOG.info("Received record from streaming binlog phase, released checkpoint lock."); + } + + // begin streaming binlog phase + while (isRunning) { + // If the handover is closed or has errors, exit. + // If there is no streaming phase, the handover will be closed by the engine. + handleBatch(handover.pollNext()); + } + } catch (Handover.ClosedException e) { + // ignore + } catch (RetriableException e) { + // Retriable exception should be ignored by DebeziumChangeFetcher, + // refer https://issues.redhat.com/browse/DBZ-2531 for more information. + // Because Retriable exception is ignored by the DebeziumEngine and + // the retry is handled in io.debezium.connector.common.BaseSourceTask.poll() + LOG.info( + "Ignore the RetriableException, the underlying DebeziumEngine will restart automatically", + e); + } + } + + public void close() { + isRunning = false; + handover.close(); + } + + // --------------------------------------------------------------------------------------- + // Metric getter + // --------------------------------------------------------------------------------------- + + /** + * The metric indicates delay from data generation to entry into the system. + * + * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is + * unavailable. + */ + public long getFetchDelay() { + return fetchDelay; + } + + /** + * The metric indicates delay from data generation to leaving the source operator. + * + * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is + * unavailable. + */ + public long getEmitDelay() { + return emitDelay; + } + + public long getIdleTime() { + return System.currentTimeMillis() - processTime; + } + + // --------------------------------------------------------------------------------------- + // Helper + // --------------------------------------------------------------------------------------- + + private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents) + throws Exception { + if (CollectionUtils.isEmpty(changeEvents)) { + return; + } + this.processTime = System.currentTimeMillis(); + + for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { + SourceRecord record = event.value(); + updateMessageTimestamp(record); + fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp; + + if (isHeartbeatEvent(record)) { + // keep offset update + synchronized (checkpointLock) { + debeziumOffset.setSourcePartition(record.sourcePartition()); + debeziumOffset.setSourceOffset(record.sourceOffset()); + } + // drop heartbeat events + continue; + } + + deserialization.deserialize(record, debeziumCollector); + + if (!isSnapshotRecord(record)) { + LOG.debug("Snapshot phase finishes."); + isInDbSnapshotPhase = false; + } + + // emit the actual records. this also updates offset state atomically + emitRecordsUnderCheckpointLock( + debeziumCollector.records, record.sourcePartition(), record.sourceOffset()); + } + } + + private void emitRecordsUnderCheckpointLock( + Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { + // Emit the records. Use the checkpoint lock to guarantee + // atomicity of record emission and offset state update. + // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode. + synchronized (checkpointLock) { + T record; + while ((record = records.poll()) != null) { + emitDelay = + isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp; + sourceContext.collect(record); + } + // update offset to state + debeziumOffset.setSourcePartition(sourcePartition); + debeziumOffset.setSourceOffset(sourceOffset); + } + } + + private void updateMessageTimestamp(SourceRecord record) { + Schema schema = record.valueSchema(); + Struct value = (Struct) record.value(); + if (schema.field(Envelope.FieldName.SOURCE) == null) { + return; + } + + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) { + return; + } + + Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP); + if (tsMs != null) { + this.messageTimestamp = tsMs; + } + } + + private boolean isHeartbeatEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.startsWith(heartbeatTopicPrefix); + } + + private boolean isSnapshotRecord(SourceRecord record) { + Struct value = (Struct) record.value(); + if (value != null) { + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source); + // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST + // we can still recover from checkpoint and continue to read the binlog, + // because the checkpoint contains binlog position + return SnapshotRecord.TRUE == snapshotRecord; + } + return false; + } + + // --------------------------------------------------------------------------------------- + + private class DebeziumCollector implements Collector<T> { + + private final Queue<T> records = new ArrayDeque<>(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void close() { + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java new file mode 100644 index 0000000000..982adfd542 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the Debezium SourceRecord into data types + * (Java/Scala objects) that are processed by Flink. + * + * @param <T> The type created by the deserialization schema. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@PublicEvolving +public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + void open(); + + /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */ + void deserialize(SourceRecord record, Collector<T> out) throws Exception; +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java new file mode 100644 index 0000000000..daaaace1b0 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb; + +import com.ververica.cdc.debezium.Validator; +import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; +import com.ververica.cdc.debezium.internal.DebeziumOffset; +import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory; +import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory; +import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore; +import com.ververica.cdc.debezium.internal.Handover; +import com.ververica.cdc.debezium.internal.SchemaRecord; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.heartbeat.Heartbeat; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; +import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; + +/** + * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data + * from databases into Flink. + * + * <p>There are two workers during the runtime. One worker periodically pulls records from the + * database and pushes the records into the {@link Handover}. The other worker consumes the records + * from the {@link Handover} and convert the records to the data in Flink style. The reason why + * don't use one workers is because debezium has different behaviours in snapshot phase and + * streaming phase. + * + * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the + * consumer. Because the two threads don't communicate to each other directly, the error reporting + * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link + * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the + * consumer to check the error. However, the source function just closes the engine and wakes up the + * producer if the error is from the Flink side. + * + * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the + * logic in the error reporting. + * + * <p>The source function participates in checkpointing and guarantees that no data is lost during a + * failure, and that the computation processes elements "exactly once". + * + * <p>Note: currently, the source function can't run in multiple parallel instances. + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@PublicEvolving +public class DebeziumSourceFunction<T> extends RichSourceFunction<T> + implements + CheckpointedFunction, + CheckpointListener, + ResultTypeQueryable<T> { + + private static final long serialVersionUID = -5808108641062931623L; + + protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class); + + /** State name of the consumer's partition offset states. */ + public static final String OFFSETS_STATE_NAME = "offset-states"; + + /** State name of the consumer's history records state. */ + public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states"; + + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + /** + * The configuration represents the Debezium MySQL Connector uses the legacy implementation or + * not. + */ + public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation"; + + /** The configuration value represents legacy implementation. */ + public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy"; + + // --------------------------------------------------------------------------------------- + // Properties + // --------------------------------------------------------------------------------------- + + /** The schema to convert from Debezium's messages into Flink's objects. */ + private final DebeziumDeserializationSchema<T> deserializer; + + /** User-supplied properties for Kafka. * */ + private final Properties properties; + + /** The specific binlog offset to read from when the first startup. */ + private final @Nullable DebeziumOffset specificOffset; + + /** Data for pending but uncommitted offsets. */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + /** Flag indicating whether the Debezium Engine is started. */ + private volatile boolean debeziumStarted = false; + + /** Validator to validate the connected database satisfies the cdc connector's requirements. */ + private final Validator validator; + + // --------------------------------------------------------------------------------------- + // State + // --------------------------------------------------------------------------------------- + + /** + * The offsets to restore to, if the consumer restores state from a checkpoint. + * + * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} + * method. + * + * <p>Using a String because we are encoding the offset state in JSON bytes. + */ + private transient volatile String restoredOffsetState; + + /** Accessor for state in the operator state backend. */ + private transient ListState<byte[]> offsetState; + + /** + * State to store the history records, i.e. schema changes. + * + * @see FlinkDatabaseHistory + * @see FlinkDatabaseSchemaHistory + */ + private transient ListState<String> schemaRecordsState; + + // --------------------------------------------------------------------------------------- + // Worker + // --------------------------------------------------------------------------------------- + + private transient ExecutorService executor; + private transient DebeziumEngine<?> engine; + /** + * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly + * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}. + */ + private transient String engineInstanceName; + + /** Consume the events from the engine and commit the offset to the engine. */ + private transient DebeziumChangeConsumer changeConsumer; + + /** The consumer to fetch records from {@link Handover}. */ + private transient DebeziumChangeFetcher<T> debeziumChangeFetcher; + + /** Buffer the events from the source and record the errors from the debezium. */ + private transient Handover handover; + + // --------------------------------------------------------------------------------------- + + public DebeziumSourceFunction( + DebeziumDeserializationSchema<T> deserializer, + Properties properties, + @Nullable DebeziumOffset specificOffset, + Validator validator) { + this.deserializer = deserializer; + this.properties = properties; + this.specificOffset = specificOffset; + this.validator = validator; + } + + @Override + public void open(Configuration parameters) throws Exception { + validator.validate(); + super.open(parameters); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("debezium-engine").build(); + deserializer.open(); + this.executor = Executors.newSingleThreadExecutor(threadFactory); + this.handover = new Handover(); + this.changeConsumer = new DebeziumChangeConsumer(handover); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore stateStore = context.getOperatorStateStore(); + this.offsetState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); + this.schemaRecordsState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); + + if (context.isRestored()) { + restoreOffsetState(); + restoreHistoryRecordsState(); + } else { + if (specificOffset != null) { + byte[] serializedOffset = + DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8); + LOG.info( + "Consumer subtask {} starts to read from specified offset {}.", + getRuntimeContext().getIndexOfThisSubtask(), + restoredOffsetState); + } else { + LOG.info( + "Consumer subtask {} has no restore state.", + getRuntimeContext().getIndexOfThisSubtask()); + } + } + } + + private void restoreOffsetState() throws Exception { + for (byte[] serializedOffset : offsetState.get()) { + if (restoredOffsetState == null) { + restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8); + } else { + throw new RuntimeException( + "Debezium Source only support single task, " + + "however, this is restored from multiple tasks."); + } + } + LOG.info( + "Consumer subtask {} restored offset state: {}.", + getRuntimeContext().getIndexOfThisSubtask(), + restoredOffsetState); + } + + private void restoreHistoryRecordsState() throws Exception { + DocumentReader reader = DocumentReader.defaultReader(); + ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>(); + int recordsCount = 0; + boolean firstEntry = true; + for (String record : schemaRecordsState.get()) { + if (firstEntry) { + // we store the engine instance name in the first element + this.engineInstanceName = record; + firstEntry = false; + } else { + // Put the records into the state. The database history should read, reorganize and + // register the state. + historyRecords.add(new SchemaRecord(reader.read(record))); + recordsCount++; + } + } + if (engineInstanceName != null) { + registerHistory(engineInstanceName, historyRecords); + } + LOG.info( + "Consumer subtask {} restored history records state: {} with {} records.", + getRuntimeContext().getIndexOfThisSubtask(), + engineInstanceName, + recordsCount); + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + } + + private void snapshotOffsetState(long checkpointId) throws Exception { + offsetState.clear(); + + final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher; + + byte[] serializedOffset = null; + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets + if (restoredOffsetState != null) { + serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8); + } + } else { + byte[] currentState = fetcher.snapshotCurrentState(); + if (currentState == null && restoredOffsetState != null) { + // the fetcher has been initialized, but has not yet received any data, + // which means we need to return the originally restored offsets. + serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8); + } else { + serializedOffset = currentState; + } + } + + if (serializedOffset != null) { + offsetState.add(serializedOffset); + // the map cannot be asynchronously updated, because only one checkpoint call + // can happen on this function at a time: either snapshotState() or + // notifyCheckpointComplete() + pendingOffsetsToCommit.put(checkpointId, serializedOffset); + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } + } + } + + private void snapshotHistoryRecordsState() throws Exception { + schemaRecordsState.clear(); + + if (engineInstanceName != null) { + schemaRecordsState.add(engineInstanceName); + Collection<SchemaRecord> records = retrieveHistory(engineInstanceName); + DocumentWriter writer = DocumentWriter.defaultWriter(); + for (SchemaRecord record : records) { + schemaRecordsState.add(writer.write(record.toDocument())); + } + } + } + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + properties.setProperty("name", "engine"); + properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); + if (restoredOffsetState != null) { + // restored from state + properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); + } + // DO NOT include schema change, e.g. DDL + properties.setProperty("include.schema.changes", "false"); + // disable the offset flush totally + properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); + // disable tombstones + properties.setProperty("tombstones.on.delete", "false"); + if (engineInstanceName == null) { + // not restore from recovery + engineInstanceName = UUID.randomUUID().toString(); + } + // history instance name to initialize FlinkDatabaseHistory + properties.setProperty( + FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName); + // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't + // continue to read binlog + // see + // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector + // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/ + properties.setProperty("database.history", determineDatabase().getCanonicalName()); + + // we have to filter out the heartbeat events, otherwise the deserializer will fail + String dbzHeartbeatPrefix = + properties.getProperty( + Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), + Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); + this.debeziumChangeFetcher = + new DebeziumChangeFetcher<>( + sourceContext, + deserializer, + restoredOffsetState == null, // DB snapshot phase if restore state is null + dbzHeartbeatPrefix, + handover); + + // create the engine with this configuration ... + this.engine = + DebeziumEngine.create(Connect.class) + .using(properties) + .notifying(changeConsumer) + .using(OffsetCommitPolicy.always()) + .using( + (success, message, error) -> { + if (success) { + // Close the handover and prepare to exit. + handover.close(); + } else { + handover.reportError(error); + } + }) + .build(); + + // run the engine asynchronously + executor.execute(engine); + debeziumStarted = true; + + // initialize metrics + // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14 + final Method getMetricGroupMethod = + getRuntimeContext().getClass().getMethod("getMetricGroup"); + getMetricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = + (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext()); + + metricGroup.gauge( + "currentFetchEventTimeLag", + (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay()); + metricGroup.gauge( + "currentEmitEventTimeLag", + (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); + metricGroup.gauge( + "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); + + // start the real debezium consumer + debeziumChangeFetcher.runFetchLoop(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (!debeziumStarted) { + LOG.debug("notifyCheckpointComplete() called when engine is not started."); + return; + } + + final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher; + if (fetcher == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + try { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn( + "Consumer subtask {} received confirmation for unknown checkpoint id {}", + getRuntimeContext().getIndexOfThisSubtask(), + checkpointId); + return; + } + + byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (serializedOffsets == null || serializedOffsets.length == 0) { + LOG.debug( + "Consumer subtask {} has empty checkpoint state.", + getRuntimeContext().getIndexOfThisSubtask()); + return; + } + + DebeziumOffset offset = + DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets); + changeConsumer.commitOffset(offset); + } catch (Exception e) { + // ignore exception if we are no longer running + LOG.warn("Ignore error when committing offset to database.", e); + } + } + + @Override + public void cancel() { + // safely and gracefully stop the engine + shutdownEngine(); + if (debeziumChangeFetcher != null) { + debeziumChangeFetcher.close(); + } + } + + @Override + public void close() throws Exception { + cancel(); + + if (executor != null) { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } + + super.close(); + } + + /** Safely and gracefully stop the Debezium engine. */ + private void shutdownEngine() { + try { + if (engine != null) { + engine.close(); + } + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } finally { + if (executor != null) { + executor.shutdownNow(); + } + + debeziumStarted = false; + + if (handover != null) { + handover.close(); + } + } + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + private Class<?> determineDatabase() { + boolean isCompatibleWithLegacy = + FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName)); + if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) { + // specifies the legacy implementation but the state may be incompatible + if (isCompatibleWithLegacy) { + return FlinkDatabaseHistory.class; + } else { + throw new IllegalStateException( + "The configured option 'debezium.internal.implementation' is 'legacy', but the state of source is incompatible with this implementation, you should remove the the option."); + } + } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) { + // tries the non-legacy first + return FlinkDatabaseSchemaHistory.class; + } else if (isCompatibleWithLegacy) { + // fallback to legacy if possible + return FlinkDatabaseHistory.class; + } else { + // impossible + throw new IllegalStateException("Can't determine which DatabaseHistory to use."); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java index 22e44f046f..d8e4770f79 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java @@ -24,7 +24,6 @@ import org.apache.inlong.sort.base.metric.SourceMetricData; import com.mongodb.client.model.changestream.OperationType; import com.mongodb.internal.HexUtils; import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.AppendMetadataCollector; import com.ververica.cdc.debezium.table.MetadataConverter; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -111,6 +110,8 @@ public class MongoDBConnectorDeserializationSchema implements DebeziumDeserializ */ private final AppendMetadataCollector appendMetadataCollector; + private final MetricOption metricOption; + private SourceMetricData sourceMetricData; public MongoDBConnectorDeserializationSchema( @@ -124,8 +125,13 @@ public class MongoDBConnectorDeserializationSchema implements DebeziumDeserializ this.physicalConverter = createConverter(physicalDataType); this.resultTypeInfo = resultTypeInfo; this.localTimeZone = localTimeZone; + this.metricOption = metricOption; + } + + @Override + public void open() { if (metricOption != null) { - this.sourceMetricData = new SourceMetricData(metricOption); + sourceMetricData = new SourceMetricData(metricOption); } } @@ -151,7 +157,7 @@ public class MongoDBConnectorDeserializationSchema implements DebeziumDeserializ case DELETE: GenericRowData delete = extractRowData(documentKey); delete.setRowKind(RowKind.DELETE); - emit(record, delete, out); + emit(record, delete, sourceMetricData == null ? out : new MetricsCollector<>(out, sourceMetricData)); break; case UPDATE: // It’s null if another operation deletes the document diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java new file mode 100644 index 0000000000..f9aab2d54f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb; + +import com.mongodb.client.model.changestream.FullDocument; +import com.mongodb.kafka.connect.source.MongoSourceConfig; +import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance; +import com.mongodb.kafka.connect.source.MongoSourceConfig.OutputFormat; +import com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceConnector; +import com.ververica.cdc.debezium.Validator; +import io.debezium.heartbeat.Heartbeat; +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Arrays; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.COLLECTION_INCLUDE_LIST; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME; +import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA; +import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*; +import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A builder to build a SourceFunction which can read snapshot and continue to consume change stream + * events. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@PublicEvolving +public class MongoDBSource { + + public static final String FULL_DOCUMENT_UPDATE_LOOKUP = FullDocument.UPDATE_LOOKUP.getValue(); + + public static final String OUTPUT_FORMAT_SCHEMA = + OutputFormat.SCHEMA.name().toLowerCase(Locale.ROOT); + + public static <T> Builder<T> builder() { + return new Builder<>(); + } + + /** Builder class of {@link MongoDBSource}. */ + public static class Builder<T> { + + private String hosts; + private String username; + private String password; + private List<String> databaseList; + private List<String> collectionList; + private String connectionOptions; + private Integer batchSize = BATCH_SIZE.defaultValue(); + private Integer pollAwaitTimeMillis = POLL_AWAIT_TIME_MILLIS.defaultValue(); + private Integer pollMaxBatchSize = POLL_MAX_BATCH_SIZE.defaultValue(); + private Boolean updateLookup = true; + private Boolean copyExisting = COPY_EXISTING.defaultValue(); + private Integer copyExistingMaxThreads; + private Integer copyExistingQueueSize; + private String copyExistingPipeline; + private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue(); + private DebeziumDeserializationSchema<T> deserializer; + + /** The comma-separated list of hostname and port pairs of mongodb servers. */ + public Builder<T> hosts(String hosts) { + this.hosts = hosts; + return this; + } + + /** + * Ampersand (i.e. &) separated MongoDB connection options eg + * replicaSet=test&connectTimeoutMS=300000 + * https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options + */ + public Builder<T> connectionOptions(String connectionOptions) { + this.connectionOptions = connectionOptions; + return this; + } + + /** Name of the database user to be used when connecting to MongoDB. */ + public Builder<T> username(String username) { + this.username = username; + return this; + } + + /** Password to be used when connecting to MongoDB. */ + public Builder<T> password(String password) { + this.password = password; + return this; + } + + /** Regular expressions list that match database names to be monitored. */ + public Builder<T> databaseList(String... databaseList) { + this.databaseList = Arrays.asList(databaseList); + return this; + } + + /** + * Regular expressions that match fully-qualified collection identifiers for collections to + * be monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}. + */ + public Builder<T> collectionList(String... collectionList) { + this.collectionList = Arrays.asList(collectionList); + return this; + } + + /** + * batch.size + * + * <p>The cursor batch size. Default: 1024 + * + * <p>The change stream cursor batch size. Specifies the maximum number of change events to + * return in each batch of the response from the MongoDB cluster. The default is 0 meaning + * it uses the server's default value. Default: 0 + */ + public Builder<T> batchSize(int batchSize) { + checkArgument(batchSize >= 0); + this.batchSize = batchSize; + return this; + } + + /** + * poll.await.time.ms + * + * <p>The amount of time to wait before checking for new results on the change stream. + * Default: 1000 + */ + public Builder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) { + checkArgument(pollAwaitTimeMillis > 0); + this.pollAwaitTimeMillis = pollAwaitTimeMillis; + return this; + } + + /** + * poll.max.batch.size + * + * <p>Maximum number of change stream documents to include in a single batch when polling + * for new data. This setting can be used to limit the amount of data buffered internally in + * the connector. Default: 1024 + */ + public Builder<T> pollMaxBatchSize(int pollMaxBatchSize) { + checkArgument(pollMaxBatchSize > 0); + this.pollMaxBatchSize = pollMaxBatchSize; + return this; + } + + /** + * change.stream.full.document + * + * <p>Determines what to return for update operations when using a Change Stream. When set + * to true, the change stream for partial updates will include both a delta describing the + * changes to the document and a copy of the entire document that was changed from some time + * after the change occurred. Default: true + */ + public Builder<T> updateLookup(boolean updateLookup) { + this.updateLookup = updateLookup; + return this; + } + + /** + * copy.existing + * + * <p>Copy existing data from source collections and convert them to Change Stream events on + * their respective topics. Any changes to the data that occur during the copy process are + * applied once the copy is completed. + */ + public Builder<T> copyExisting(boolean copyExisting) { + this.copyExisting = copyExisting; + return this; + } + + /** + * copy.existing.max.threads + * + * <p>The number of threads to use when performing the data copy. Defaults to the number of + * processors. Default: defaults to the number of processors + */ + public Builder<T> copyExistingMaxThreads(int copyExistingMaxThreads) { + checkArgument(copyExistingMaxThreads > 0); + this.copyExistingMaxThreads = copyExistingMaxThreads; + return this; + } + + /** + * copy.existing.queue.size + * + * <p>The max size of the queue to use when copying data. Default: 10240 + */ + public Builder<T> copyExistingQueueSize(int copyExistingQueueSize) { + checkArgument(copyExistingQueueSize > 0); + this.copyExistingQueueSize = copyExistingQueueSize; + return this; + } + + /** + * copy.existing.pipeline eg. [ { "$match": { "closed": "false" } } ] + * + * <p>An array of JSON objects describing the pipeline operations to run when copying + * existing data. This can improve the use of indexes by the copying manager and make + * copying more efficient. + */ + public Builder<T> copyExistingPipeline(String copyExistingPipeline) { + this.copyExistingPipeline = copyExistingPipeline; + return this; + } + + /** + * heartbeat.interval.ms + * + * <p>The length of time in milliseconds between sending heartbeat messages. Heartbeat + * messages contain the post batch resume token and are sent when no source records have + * been published in the specified interval. This improves the resumability of the connector + * for low volume namespaces. Use 0 to disable. + */ + public Builder<T> heartbeatIntervalMillis(int heartbeatIntervalMillis) { + checkArgument(heartbeatIntervalMillis >= 0); + this.heartbeatIntervalMillis = heartbeatIntervalMillis; + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) { + this.deserializer = deserializer; + return this; + } + + /** + * The properties of mongodb kafka connector. + * https://docs.mongodb.com/kafka-connector/current/kafka-source + */ + public DebeziumSourceFunction<T> build() { + Properties props = new Properties(); + + props.setProperty( + "connector.class", MongoDBConnectorSourceConnector.class.getCanonicalName()); + props.setProperty("name", "mongodb_cdc_source"); + + props.setProperty( + MongoSourceConfig.CONNECTION_URI_CONFIG, + String.valueOf( + buildConnectionString(username, password, hosts, connectionOptions))); + + if (databaseList != null) { + props.setProperty(DATABASE_INCLUDE_LIST, String.join(",", databaseList)); + } + + if (collectionList != null) { + props.setProperty(COLLECTION_INCLUDE_LIST, String.join(",", collectionList)); + } + + if (updateLookup) { + props.setProperty( + MongoSourceConfig.FULL_DOCUMENT_CONFIG, FULL_DOCUMENT_UPDATE_LOOKUP); + } + + props.setProperty( + MongoSourceConfig.PUBLISH_FULL_DOCUMENT_ONLY_CONFIG, + String.valueOf(Boolean.FALSE)); + + props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_KEY_CONFIG, OUTPUT_FORMAT_SCHEMA); + props.setProperty(MongoSourceConfig.OUTPUT_FORMAT_VALUE_CONFIG, OUTPUT_FORMAT_SCHEMA); + props.setProperty( + MongoSourceConfig.OUTPUT_SCHEMA_INFER_VALUE_CONFIG, + String.valueOf(Boolean.FALSE)); + props.setProperty(MongoSourceConfig.OUTPUT_SCHEMA_VALUE_CONFIG, OUTPUT_SCHEMA); + + if (batchSize != null) { + props.setProperty(MongoSourceConfig.BATCH_SIZE_CONFIG, String.valueOf(batchSize)); + } + + if (pollAwaitTimeMillis != null) { + props.setProperty( + MongoSourceConfig.POLL_AWAIT_TIME_MS_CONFIG, + String.valueOf(pollAwaitTimeMillis)); + } + + if (pollMaxBatchSize != null) { + props.setProperty( + MongoSourceConfig.POLL_MAX_BATCH_SIZE_CONFIG, + String.valueOf(pollMaxBatchSize)); + } + + if (copyExisting != null) { + props.setProperty( + MongoSourceConfig.COPY_EXISTING_CONFIG, String.valueOf(copyExisting)); + } + + if (copyExistingMaxThreads != null) { + props.setProperty( + MongoSourceConfig.COPY_EXISTING_MAX_THREADS_CONFIG, + String.valueOf(copyExistingMaxThreads)); + } + + if (copyExistingQueueSize != null) { + props.setProperty( + MongoSourceConfig.COPY_EXISTING_QUEUE_SIZE_CONFIG, + String.valueOf(copyExistingQueueSize)); + } + + if (copyExistingPipeline != null) { + props.setProperty( + MongoSourceConfig.COPY_EXISTING_PIPELINE_CONFIG, copyExistingPipeline); + } + + if (heartbeatIntervalMillis != null) { + props.setProperty( + MongoSourceConfig.HEARTBEAT_INTERVAL_MS_CONFIG, + String.valueOf(heartbeatIntervalMillis)); + } + + props.setProperty(MongoSourceConfig.HEARTBEAT_TOPIC_NAME_CONFIG, HEARTBEAT_TOPIC_NAME); + + // Let DebeziumChangeFetcher recognize heartbeat record + props.setProperty(Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), HEARTBEAT_TOPIC_NAME); + + props.setProperty( + MongoSourceConfig.ERRORS_LOG_ENABLE_CONFIG, String.valueOf(Boolean.TRUE)); + props.setProperty( + MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value()); + + return new DebeziumSourceFunction<>( + deserializer, props, null, Validator.getDefaultValidator()); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java index a20122f99c..9c417b4edf 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java @@ -18,11 +18,10 @@ package org.apache.inlong.sort.mongodb; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.mongodb.source.MongoDBSource; +import org.apache.inlong.sort.mongodb.source.MongoDBSourceBuilder; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSource; -import com.ververica.cdc.connectors.mongodb.source.MongoDBSourceBuilder; import com.ververica.cdc.connectors.mongodb.table.MongoDBReadableMetadata; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.MetadataConverter; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -195,8 +194,8 @@ public class MongoDBTableSource implements ScanTableSource, SupportsReadingMetad return SourceProvider.of(builder.build()); } else { - com.ververica.cdc.connectors.mongodb.MongoDBSource.Builder<RowData> builder = - com.ververica.cdc.connectors.mongodb.MongoDBSource.<RowData>builder() + org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData> builder = + org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder() .hosts(hosts) .deserializer(deserializer); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java new file mode 100644 index 0000000000..350e588e21 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb.source; + +import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; + +import com.ververica.cdc.connectors.base.config.SourceConfig; +import com.ververica.cdc.connectors.base.dialect.DataSourceDialect; +import com.ververica.cdc.connectors.base.options.StartupMode; +import com.ververica.cdc.connectors.base.source.assigner.HybridSplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.SplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.StreamSplitAssigner; +import com.ververica.cdc.connectors.base.source.assigner.state.HybridPendingSplitsState; +import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsState; +import com.ververica.cdc.connectors.base.source.assigner.state.PendingSplitsStateSerializer; +import com.ververica.cdc.connectors.base.source.assigner.state.StreamPendingSplitsState; +import com.ververica.cdc.connectors.base.source.enumerator.IncrementalSourceEnumerator; +import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceSplitReader; +import io.debezium.relational.TableId; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.FlinkRuntimeException; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.function.Supplier; + +/** + * The basic source of Incremental Snapshot framework for datasource, it is based on FLIP-27 and + * Watermark Signal Algorithm which supports parallel reading snapshot of table and then continue to + * capture data change by streaming reading. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@Experimental +public class IncrementalSource<T, C extends SourceConfig> + implements + Source<T, SourceSplitBase, PendingSplitsState>, + ResultTypeQueryable<T> { + + private static final long serialVersionUID = 1L; + + protected final SourceConfig.Factory<C> configFactory; + protected final DataSourceDialect<C> dataSourceDialect; + protected final OffsetFactory offsetFactory; + protected final DebeziumDeserializationSchema<T> deserializationSchema; + protected final SourceSplitSerializer sourceSplitSerializer; + + public IncrementalSource( + SourceConfig.Factory<C> configFactory, + DebeziumDeserializationSchema<T> deserializationSchema, + OffsetFactory offsetFactory, + DataSourceDialect<C> dataSourceDialect) { + this.configFactory = configFactory; + this.deserializationSchema = deserializationSchema; + this.offsetFactory = offsetFactory; + this.dataSourceDialect = dataSourceDialect; + this.sourceSplitSerializer = + new SourceSplitSerializer() { + + @Override + public OffsetFactory getOffsetFactory() { + return offsetFactory; + } + }; + } + + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + @Override + public IncrementalSourceReader<T, C> createReader(SourceReaderContext readerContext) + throws Exception { + // create source config for the given subtask (e.g. unique server id) + C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask()); + FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue = + new FutureCompletingBlockingQueue<>(); + + // Forward compatible with flink 1.13 + final Method metricGroupMethod = readerContext.getClass().getMethod("metricGroup"); + metricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = (MetricGroup) metricGroupMethod.invoke(readerContext); + final SourceReaderMetrics sourceReaderMetrics = new SourceReaderMetrics(metricGroup); + + sourceReaderMetrics.registerMetrics(); + Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier = + () -> new IncrementalSourceSplitReader<>( + readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig); + return new IncrementalSourceReader<>( + elementsQueue, + splitReaderSupplier, + createRecordEmitter(sourceConfig, sourceReaderMetrics), + readerContext.getConfiguration(), + readerContext, + sourceConfig, + sourceSplitSerializer, + dataSourceDialect); + } + + @Override + public SplitEnumerator<SourceSplitBase, PendingSplitsState> createEnumerator( + SplitEnumeratorContext<SourceSplitBase> enumContext) { + C sourceConfig = configFactory.create(0); + final SplitAssigner splitAssigner; + if (sourceConfig.getStartupOptions().startupMode == StartupMode.INITIAL) { + try { + final List<TableId> remainingTables = + dataSourceDialect.discoverDataCollections(sourceConfig); + boolean isTableIdCaseSensitive = + dataSourceDialect.isDataCollectionIdCaseSensitive(sourceConfig); + splitAssigner = + new HybridSplitAssigner<>( + sourceConfig, + enumContext.currentParallelism(), + remainingTables, + isTableIdCaseSensitive, + dataSourceDialect, + offsetFactory); + } catch (Exception e) { + throw new FlinkRuntimeException( + "Failed to discover captured tables for enumerator", e); + } + } else { + splitAssigner = new StreamSplitAssigner(sourceConfig, dataSourceDialect, offsetFactory); + } + + return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner); + } + + @Override + public SplitEnumerator<SourceSplitBase, PendingSplitsState> restoreEnumerator( + SplitEnumeratorContext<SourceSplitBase> enumContext, PendingSplitsState checkpoint) { + C sourceConfig = configFactory.create(0); + + final SplitAssigner splitAssigner; + if (checkpoint instanceof HybridPendingSplitsState) { + splitAssigner = + new HybridSplitAssigner<>( + sourceConfig, + enumContext.currentParallelism(), + (HybridPendingSplitsState) checkpoint, + dataSourceDialect, + offsetFactory); + } else if (checkpoint instanceof StreamPendingSplitsState) { + splitAssigner = + new StreamSplitAssigner( + sourceConfig, + (StreamPendingSplitsState) checkpoint, + dataSourceDialect, + offsetFactory); + } else { + throw new UnsupportedOperationException( + "Unsupported restored PendingSplitsState: " + checkpoint); + } + return new IncrementalSourceEnumerator(enumContext, sourceConfig, splitAssigner); + } + + @Override + public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() { + return sourceSplitSerializer; + } + + @Override + public SimpleVersionedSerializer<PendingSplitsState> getEnumeratorCheckpointSerializer() { + SourceSplitSerializer sourceSplitSerializer = (SourceSplitSerializer) getSplitSerializer(); + return new PendingSplitsStateSerializer(sourceSplitSerializer); + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializationSchema.getProducedType(); + } + + protected RecordEmitter<SourceRecords, T, SourceSplitState> createRecordEmitter( + SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) { + return new IncrementalSourceRecordEmitter<>( + deserializationSchema, + sourceReaderMetrics, + sourceConfig.isIncludeSchemaChanges(), + offsetFactory); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java new file mode 100644 index 0000000000..f8747a04d7 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb.source; + +import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; + +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader; +import com.ververica.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import io.debezium.document.Array; +import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent; +import static com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent; +import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.*; + +/** + * The {@link RecordEmitter} implementation for {@link IncrementalSourceReader}. + * + * <p>The {@link RecordEmitter} buffers the snapshot records of split and call the stream reader to + * emit records rather than emit the records directly. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +public class IncrementalSourceRecordEmitter<T> + implements + RecordEmitter<SourceRecords, T, SourceSplitState> { + + private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceRecordEmitter.class); + private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + protected final DebeziumDeserializationSchema<T> debeziumDeserializationSchema; + protected final SourceReaderMetrics sourceReaderMetrics; + protected final boolean includeSchemaChanges; + protected final OutputCollector<T> outputCollector; + protected final OffsetFactory offsetFactory; + + public IncrementalSourceRecordEmitter( + DebeziumDeserializationSchema<T> debeziumDeserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + boolean includeSchemaChanges, + OffsetFactory offsetFactory) { + this.debeziumDeserializationSchema = debeziumDeserializationSchema; + this.sourceReaderMetrics = sourceReaderMetrics; + this.includeSchemaChanges = includeSchemaChanges; + this.outputCollector = new OutputCollector<>(); + this.offsetFactory = offsetFactory; + } + + @Override + public void emitRecord( + SourceRecords sourceRecords, SourceOutput<T> output, SourceSplitState splitState) + throws Exception { + final Iterator<SourceRecord> elementIterator = sourceRecords.iterator(); + while (elementIterator.hasNext()) { + processElement(elementIterator.next(), output, splitState); + } + } + + protected void processElement( + SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) + throws Exception { + if (isWatermarkEvent(element)) { + Offset watermark = getWatermark(element); + if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + splitState.asSnapshotSplitState().setHighWatermark(watermark); + } + } else if (isSchemaChangeEvent(element) && splitState.isStreamSplitState()) { + HistoryRecord historyRecord = getHistoryRecord(element); + Array tableChanges = + historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES); + TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true); + for (TableChanges.TableChange tableChange : changes) { + splitState.asStreamSplitState().recordSchema(tableChange.getId(), tableChange); + } + if (includeSchemaChanges) { + emitElement(element, output); + } + } else if (isDataChangeRecord(element)) { + if (splitState.isStreamSplitState()) { + Offset position = getOffsetPosition(element); + splitState.asStreamSplitState().setStartingOffset(position); + } + reportMetrics(element); + emitElement(element, output); + } else { + // unknown element + LOG.info("Meet unknown element {}, just skip.", element); + } + } + + private Offset getWatermark(SourceRecord watermarkEvent) { + return getOffsetPosition(watermarkEvent.sourceOffset()); + } + + public Offset getOffsetPosition(SourceRecord dataRecord) { + return getOffsetPosition(dataRecord.sourceOffset()); + } + + public Offset getOffsetPosition(Map<String, ?> offset) { + Map<String, String> offsetStrMap = new HashMap<>(); + for (Map.Entry<String, ?> entry : offset.entrySet()) { + offsetStrMap.put( + entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString()); + } + return offsetFactory.newOffset(offsetStrMap); + } + + protected void emitElement(SourceRecord element, SourceOutput<T> output) throws Exception { + outputCollector.output = output; + debeziumDeserializationSchema.deserialize(element, outputCollector); + } + + protected void reportMetrics(SourceRecord element) { + long now = System.currentTimeMillis(); + // record the latest process time + sourceReaderMetrics.recordProcessTime(now); + Long messageTimestamp = getMessageTimestamp(element); + + if (messageTimestamp != null && messageTimestamp > 0L) { + // report fetch delay + Long fetchTimestamp = getFetchTimestamp(element); + if (fetchTimestamp != null) { + sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); + } + // report emit delay + sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); + } + } + + private static class OutputCollector<T> implements Collector<T> { + + private SourceOutput<T> output; + + @Override + public void collect(T record) { + output.collect(record); + } + + @Override + public void close() { + // do nothing + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java new file mode 100644 index 0000000000..392b39fedd --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb.source; + +import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; + +import com.ververica.cdc.connectors.base.source.meta.offset.Offset; +import com.ververica.cdc.connectors.base.source.meta.offset.OffsetFactory; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; +import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import com.ververica.cdc.connectors.base.source.reader.IncrementalSourceReader; +import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.kafka.connect.source.SourceRecord; +import org.bson.BsonDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isHighWatermarkEvent; +import static com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent.isWatermarkEvent; +import static com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils.*; + +/** + * The {@link RecordEmitter} implementation for {@link IncrementalSourceReader}. + * + * <p>The {@link RecordEmitter} buffers the snapshot records of split and call the stream reader to + * emit records rather than emit the records directly. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +public final class MongoDBRecordEmitter<T> extends IncrementalSourceRecordEmitter<T> { + + private static final Logger LOG = LoggerFactory.getLogger(MongoDBRecordEmitter.class); + + public MongoDBRecordEmitter( + DebeziumDeserializationSchema<T> deserializationSchema, + SourceReaderMetrics sourceReaderMetrics, + OffsetFactory offsetFactory) { + super(deserializationSchema, sourceReaderMetrics, false, offsetFactory); + } + + @Override + protected void processElement( + SourceRecord element, SourceOutput<T> output, SourceSplitState splitState) + throws Exception { + if (isWatermarkEvent(element)) { + Offset watermark = getOffsetPosition(element); + if (isHighWatermarkEvent(element) && splitState.isSnapshotSplitState()) { + splitState.asSnapshotSplitState().setHighWatermark(watermark); + } + } else if (isHeartbeatEvent(element)) { + if (splitState.isStreamSplitState()) { + updatePositionForStreamSplit(element, splitState); + } + } else if (isDataChangeRecord(element)) { + if (splitState.isStreamSplitState()) { + updatePositionForStreamSplit(element, splitState); + } + reportMetrics(element); + emitElement(element, output); + } else { + // unknown element + LOG.info("Meet unknown element {}, just skip.", element); + } + } + + private void updatePositionForStreamSplit(SourceRecord element, SourceSplitState splitState) { + BsonDocument resumeToken = getResumeToken(element); + StreamSplitState streamSplitState = splitState.asStreamSplitState(); + ChangeStreamOffset offset = (ChangeStreamOffset) streamSplitState.getStartingOffset(); + if (offset != null) { + offset.updatePosition(resumeToken); + } + splitState.asStreamSplitState().setStartingOffset(offset); + } + + @Override + protected void reportMetrics(SourceRecord element) { + long now = System.currentTimeMillis(); + // record the latest process time + sourceReaderMetrics.recordProcessTime(now); + Long messageTimestamp = getMessageTimestamp(element); + + if (messageTimestamp != null && messageTimestamp > 0L) { + // report fetch delay + Long fetchTimestamp = getFetchTimestamp(element); + if (fetchTimestamp != null && fetchTimestamp >= messageTimestamp) { + sourceReaderMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); + } + // report emit delay + sourceReaderMetrics.recordEmitDelay(now - messageTimestamp); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java new file mode 100644 index 0000000000..32a149a229 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb.source; + +import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; + +import com.ververica.cdc.connectors.base.config.SourceConfig; +import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; +import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; +import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig; +import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory; +import com.ververica.cdc.connectors.mongodb.source.dialect.MongoDBDialect; +import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffsetFactory; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.base.source.reader.RecordEmitter; + +/** + * The MongoDB CDC Source based on FLIP-27 which supports parallel reading snapshot of collection + * and then continue to capture data change from change stream. + * + * <pre> + * 1. The source supports parallel capturing database(s) or collection(s) change. + * 2. The source supports checkpoint in split level when read snapshot data. + * 3. The source doesn't need apply any lock of MongoDB. + * </pre> + * + * <pre>{@code + * MongoDBSource + * .<String>builder() + * .hosts("localhost:27017") + * .databaseList("mydb") + * .collectionList("mydb.users") + * .username(username) + * .password(password) + * .deserializer(new JsonDebeziumDeserializationSchema()) + * .build(); + * }</pre> + * + * <p>See {@link MongoDBSourceBuilder} for more details. + * + * @param <T> the output type of the source. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@Internal +@Experimental +public class MongoDBSource<T> extends IncrementalSource<T, MongoDBSourceConfig> { + + private static final long serialVersionUID = 1L; + + MongoDBSource( + MongoDBSourceConfigFactory configFactory, + DebeziumDeserializationSchema<T> deserializationSchema) { + super( + configFactory, + deserializationSchema, + new ChangeStreamOffsetFactory(), + new MongoDBDialect()); + } + + /** + * Get a MongoDBSourceBuilder to build a {@link MongoDBSource}. + * + * @return a MongoDB parallel source builder. + */ + @PublicEvolving + public static <T> MongoDBSourceBuilder<T> builder() { + return new MongoDBSourceBuilder<>(); + } + + @Override + protected RecordEmitter<SourceRecords, T, SourceSplitState> createRecordEmitter( + SourceConfig sourceConfig, SourceReaderMetrics sourceReaderMetrics) { + return new MongoDBRecordEmitter<>( + deserializationSchema, sourceReaderMetrics, offsetFactory); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java new file mode 100644 index 0000000000..a95f238a0b --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mongodb.source; + +import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema; + +import com.ververica.cdc.connectors.base.options.StartupOptions; +import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.PublicEvolving; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The builder class for {@link MongoDBSource} to make it easier for the users to construct a {@link + * MongoDBSource}. + * + * <pre>{@code + * MongoDBSource + * .<String>builder() + * .hosts("localhost:27017") + * .databaseList("mydb") + * .collectionList("mydb.users") + * .username(username) + * .password(password) + * .deserializer(new JsonDebeziumDeserializationSchema()) + * .build(); + * }</pre> + * + * <p>Check the Java docs of each individual method to learn more about the settings to build a + * {@link MongoDBSource}. + * <p> + * Copy from com.ververica:flink-connector-mongodb-cdc-2.3.0 + */ +@Experimental +@PublicEvolving +public class MongoDBSourceBuilder<T> { + + private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory(); + private DebeziumDeserializationSchema<T> deserializer; + + /** The comma-separated list of hostname and port pairs of mongodb servers. */ + public MongoDBSourceBuilder<T> hosts(String hosts) { + this.configFactory.hosts(hosts); + return this; + } + + /** + * Ampersand (i.e. &) separated MongoDB connection options eg + * replicaSet=test&connectTimeoutMS=300000 + * https://docs.mongodb.com/manual/reference/connection-string/#std-label-connections-connection-options + */ + public MongoDBSourceBuilder<T> connectionOptions(String connectionOptions) { + this.configFactory.connectionOptions(connectionOptions); + return this; + } + + /** Name of the database user to be used when connecting to MongoDB. */ + public MongoDBSourceBuilder<T> username(String username) { + this.configFactory.username(username); + return this; + } + + /** Password to be used when connecting to MongoDB. */ + public MongoDBSourceBuilder<T> password(String password) { + this.configFactory.password(password); + return this; + } + + /** Regular expressions list that match database names to be monitored. */ + public MongoDBSourceBuilder<T> databaseList(String... databases) { + this.configFactory.databaseList(databases); + return this; + } + + /** + * Regular expressions that match fully-qualified collection identifiers for collections to be + * monitored. Each identifier is of the form {@code <databaseName>.<collectionName>}. + */ + public MongoDBSourceBuilder<T> collectionList(String... collections) { + this.configFactory.collectionList(collections); + return this; + } + + /** + * batch.size + * + * <p>The cursor batch size. Default: 1024 + */ + public MongoDBSourceBuilder<T> batchSize(int batchSize) { + this.configFactory.batchSize(batchSize); + return this; + } + + /** + * poll.await.time.ms + * + * <p>The amount of time to wait before checking for new results on the change stream. Default: + * 1000 + */ + public MongoDBSourceBuilder<T> pollAwaitTimeMillis(int pollAwaitTimeMillis) { + checkArgument(pollAwaitTimeMillis > 0); + this.configFactory.pollAwaitTimeMillis(pollAwaitTimeMillis); + return this; + } + + /** + * poll.max.batch.size + * + * <p>Maximum number of change stream documents to include in a single batch when polling for + * new data. This setting can be used to limit the amount of data buffered internally in the + * connector. Default: 1024 + */ + public MongoDBSourceBuilder<T> pollMaxBatchSize(int pollMaxBatchSize) { + this.configFactory.pollMaxBatchSize(pollMaxBatchSize); + return this; + } + + /** + * copy.existing + * + * <p>Copy existing data from source collections and convert them to Change Stream events on + * their respective topics. Any changes to the data that occur during the copy process are + * applied once the copy is completed. + */ + public MongoDBSourceBuilder<T> copyExisting(boolean copyExisting) { + if (copyExisting) { + this.configFactory.startupOptions(StartupOptions.initial()); + } else { + this.configFactory.startupOptions(StartupOptions.latest()); + } + return this; + } + + /** + * heartbeat.interval.ms + * + * <p>The length of time in milliseconds between sending heartbeat messages. Heartbeat messages + * contain the post batch resume token and are sent when no source records have been published + * in the specified interval. This improves the resumability of the connector for low volume + * namespaces. Use 0 to disable. + */ + public MongoDBSourceBuilder<T> heartbeatIntervalMillis(int heartbeatIntervalMillis) { + this.configFactory.heartbeatIntervalMillis(heartbeatIntervalMillis); + return this; + } + + /** + * scan.incremental.snapshot.chunk.size.mb + * + * <p>The chunk size mb of incremental snapshot. Default: 64mb. + */ + public MongoDBSourceBuilder<T> splitSizeMB(int splitSizeMB) { + this.configFactory.splitSizeMB(splitSizeMB); + return this; + } + + /** + * The group size of split meta, if the meta size exceeds the group size, the meta will be + * divided into multiple groups. + */ + public MongoDBSourceBuilder<T> splitMetaGroupSize(int splitMetaGroupSize) { + this.configFactory.splitMetaGroupSize(splitMetaGroupSize); + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public MongoDBSourceBuilder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) { + this.deserializer = deserializer; + return this; + } + + /** + * Build the {@link MongoDBSource}. + * + * @return a MongoDBParallelSource with the settings made for this builder. + */ + public MongoDBSource<T> build() { + configFactory.validate(); + return new MongoDBSource<>(configFactory, checkNotNull(deserializer)); + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 261b1c9f97..4af3c47722 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -857,8 +857,18 @@ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE -1.3.23 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodbMongoDBTableSource.java +1.3.23 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/IncrementalSourceRecordEmitter.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBRecordEmitter.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/source/MongoDBSourceBuilder.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumChangeFetcher.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumDeserializationSchema.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/DebeziumSourceFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBTableSource.java Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE