This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 36a2032acc [INLONG-10339][Sort] Fix PostgreSQL AuditOperator not serialized (#10344) 36a2032acc is described below commit 36a2032acc80d80ff044a7ab811e12fef68bd4d8 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed Jun 5 11:37:10 2024 +0800 [INLONG-10339][Sort] Fix PostgreSQL AuditOperator not serialized (#10344) --- .../inlong/sort/postgre/DebeziumChangeFetcher.java | 322 ++++++++++++ .../postgre/DebeziumDeserializationSchema.java | 42 ++ .../sort/postgre/DebeziumSourceFunction.java | 579 +++++++++++++++++++++ .../inlong/sort/postgre/PostgreSQLSource.java | 184 +++++++ .../inlong/sort/postgre/PostgreSQLTableSource.java | 9 +- .../postgre/RowDataDebeziumDeserializeSchema.java | 27 +- licenses/inlong-sort-connectors/LICENSE | 12 +- 7 files changed, 1157 insertions(+), 18 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java new file mode 100644 index 0000000000..ef9251f79d --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import com.ververica.cdc.debezium.internal.DebeziumOffset; +import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.ververica.cdc.debezium.internal.Handover; +import io.debezium.connector.SnapshotRecord; +import io.debezium.data.Envelope; +import io.debezium.engine.ChangeEvent; +import io.debezium.engine.DebeziumEngine; +import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.annotation.Internal; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.List; +import java.util.Map; +import java.util.Queue; + +/** + * A Handler that convert change messages from {@link DebeziumEngine} to data in Flink. Considering + * Debezium in different mode has different strategies to hold the lock, e.g. snapshot, the handler + * also needs different strategy. In snapshot phase, the handler needs to hold the lock until the + * snapshot finishes. But in non-snapshot phase, the handler only needs to hold the lock when + * emitting the records. + * + * @param <T> The type of elements produced by the handler. + * <p> + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + */ +@Internal +public class DebeziumChangeFetcher<T> { + + private static final Logger LOG = LoggerFactory.getLogger(DebeziumChangeFetcher.class); + + private final SourceFunction.SourceContext<T> sourceContext; + + /** + * The lock that guarantees that record emission and state updates are atomic, from the view of + * taking a checkpoint. + */ + private final Object checkpointLock; + + /** The schema to convert from Debezium's messages into Flink's objects. */ + private final DebeziumDeserializationSchema<T> deserialization; + + /** A collector to emit records in batch (bundle). */ + private final DebeziumCollector debeziumCollector; + + private final DebeziumOffset debeziumOffset; + + private final DebeziumOffsetSerializer stateSerializer; + + private final String heartbeatTopicPrefix; + + private boolean isInDbSnapshotPhase; + + private final Handover handover; + + private volatile boolean isRunning = true; + + // --------------------------------------------------------------------------------------- + // Metrics + // --------------------------------------------------------------------------------------- + + /** Timestamp of change event. If the event is a snapshot event, the timestamp is 0L. */ + private volatile long messageTimestamp = 0L; + + /** The last record processing time. */ + private volatile long processTime = 0L; + + /** + * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the + * record fetched into the source operator. + */ + private volatile long fetchDelay = 0L; + + /** + * emitDelay = EmitTime - messageTimestamp, where the EmitTime is the time the record leaves the + * source operator. + */ + private volatile long emitDelay = 0L; + + // ------------------------------------------------------------------------ + + public DebeziumChangeFetcher( + SourceFunction.SourceContext<T> sourceContext, + DebeziumDeserializationSchema<T> deserialization, + boolean isInDbSnapshotPhase, + String heartbeatTopicPrefix, + Handover handover) { + this.sourceContext = sourceContext; + this.checkpointLock = sourceContext.getCheckpointLock(); + this.deserialization = deserialization; + this.isInDbSnapshotPhase = isInDbSnapshotPhase; + this.heartbeatTopicPrefix = heartbeatTopicPrefix; + this.debeziumCollector = new DebeziumCollector(); + this.debeziumOffset = new DebeziumOffset(); + this.stateSerializer = DebeziumOffsetSerializer.INSTANCE; + this.handover = handover; + } + + /** + * Take a snapshot of the Debezium handler state. + * + * <p>Important: This method must be called under the checkpoint lock. + */ + public byte[] snapshotCurrentState() throws Exception { + // this method assumes that the checkpoint lock is held + assert Thread.holdsLock(checkpointLock); + if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) { + return null; + } + + return stateSerializer.serialize(debeziumOffset); + } + + /** + * Process change messages from the {@link Handover} and collect the processed messages by + * {@link Collector}. + */ + public void runFetchLoop() throws Exception { + try { + // begin snapshot database phase + if (isInDbSnapshotPhase) { + List<ChangeEvent<SourceRecord, SourceRecord>> events = handover.pollNext(); + + synchronized (checkpointLock) { + LOG.info( + "Database snapshot phase can't perform checkpoint, acquired Checkpoint lock."); + handleBatch(events); + while (isRunning && isInDbSnapshotPhase) { + handleBatch(handover.pollNext()); + } + } + LOG.info("Received record from streaming binlog phase, released checkpoint lock."); + } + + // begin streaming binlog phase + while (isRunning) { + // If the handover is closed or has errors, exit. + // If there is no streaming phase, the handover will be closed by the engine. + handleBatch(handover.pollNext()); + } + } catch (Handover.ClosedException e) { + // ignore + } catch (RetriableException e) { + // Retriable exception should be ignored by DebeziumChangeFetcher, + // refer https://issues.redhat.com/browse/DBZ-2531 for more information. + // Because Retriable exception is ignored by the DebeziumEngine and + // the retry is handled in io.debezium.connector.common.BaseSourceTask.poll() + LOG.info( + "Ignore the RetriableException, the underlying DebeziumEngine will restart automatically", + e); + } + } + + public void close() { + isRunning = false; + handover.close(); + } + + // --------------------------------------------------------------------------------------- + // Metric getter + // --------------------------------------------------------------------------------------- + + /** + * The metric indicates delay from data generation to entry into the system. + * + * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is + * unavailable. + */ + public long getFetchDelay() { + return fetchDelay; + } + + /** + * The metric indicates delay from data generation to leaving the source operator. + * + * <p>Note: the metric is available during the binlog phase. Use 0 to indicate the metric is + * unavailable. + */ + public long getEmitDelay() { + return emitDelay; + } + + public long getIdleTime() { + return System.currentTimeMillis() - processTime; + } + + // --------------------------------------------------------------------------------------- + // Helper + // --------------------------------------------------------------------------------------- + + private void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> changeEvents) + throws Exception { + if (CollectionUtils.isEmpty(changeEvents)) { + return; + } + this.processTime = System.currentTimeMillis(); + + for (ChangeEvent<SourceRecord, SourceRecord> event : changeEvents) { + SourceRecord record = event.value(); + updateMessageTimestamp(record); + fetchDelay = isInDbSnapshotPhase ? 0L : processTime - messageTimestamp; + + if (isHeartbeatEvent(record)) { + // keep offset update + synchronized (checkpointLock) { + debeziumOffset.setSourcePartition(record.sourcePartition()); + debeziumOffset.setSourceOffset(record.sourceOffset()); + } + // drop heartbeat events + continue; + } + + deserialization.deserialize(record, debeziumCollector); + + if (!isSnapshotRecord(record)) { + LOG.debug("Snapshot phase finishes."); + isInDbSnapshotPhase = false; + } + + // emit the actual records. this also updates offset state atomically + emitRecordsUnderCheckpointLock( + debeziumCollector.records, record.sourcePartition(), record.sourceOffset()); + } + } + + private void emitRecordsUnderCheckpointLock( + Queue<T> records, Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { + // Emit the records. Use the checkpoint lock to guarantee + // atomicity of record emission and offset state update. + // The synchronized checkpointLock is reentrant. It's safe to sync again in snapshot mode. + synchronized (checkpointLock) { + T record; + while ((record = records.poll()) != null) { + emitDelay = + isInDbSnapshotPhase ? 0L : System.currentTimeMillis() - messageTimestamp; + sourceContext.collect(record); + } + // update offset to state + debeziumOffset.setSourcePartition(sourcePartition); + debeziumOffset.setSourceOffset(sourceOffset); + } + } + + private void updateMessageTimestamp(SourceRecord record) { + Schema schema = record.valueSchema(); + Struct value = (Struct) record.value(); + if (schema.field(Envelope.FieldName.SOURCE) == null) { + return; + } + + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + if (source.schema().field(Envelope.FieldName.TIMESTAMP) == null) { + return; + } + + Long tsMs = source.getInt64(Envelope.FieldName.TIMESTAMP); + if (tsMs != null) { + this.messageTimestamp = tsMs; + } + } + + private boolean isHeartbeatEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.startsWith(heartbeatTopicPrefix); + } + + private boolean isSnapshotRecord(SourceRecord record) { + Struct value = (Struct) record.value(); + if (value != null) { + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source); + // even if it is the last record of snapshot, i.e. SnapshotRecord.LAST + // we can still recover from checkpoint and continue to read the binlog, + // because the checkpoint contains binlog position + return SnapshotRecord.TRUE == snapshotRecord; + } + return false; + } + + // --------------------------------------------------------------------------------------- + + private class DebeziumCollector implements Collector<T> { + + private final Queue<T> records = new ArrayDeque<>(); + + @Override + public void collect(T record) { + records.add(record); + } + + @Override + public void close() { + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java new file mode 100644 index 0000000000..31dd750341 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the Debezium SourceRecord into data types + * (Java/Scala objects) that are processed by Flink. + * + * @param <T> The type created by the deserialization schema. + * <p> + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + */ +@PublicEvolving +public interface DebeziumDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + + void open(); + + /** Deserialize the Debezium record, it is represented in Kafka {@link SourceRecord}. */ + void deserialize(SourceRecord record, Collector<T> out) throws Exception; +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java new file mode 100644 index 0000000000..27b9988dd0 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java @@ -0,0 +1,579 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import com.ververica.cdc.debezium.Validator; +import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer; +import com.ververica.cdc.debezium.internal.DebeziumOffset; +import com.ververica.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.ververica.cdc.debezium.internal.FlinkDatabaseHistory; +import com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory; +import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore; +import com.ververica.cdc.debezium.internal.Handover; +import com.ververica.cdc.debezium.internal.SchemaRecord; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.embedded.Connect; +import io.debezium.engine.DebeziumEngine; +import io.debezium.engine.spi.OffsetCommitPolicy; +import io.debezium.heartbeat.Heartbeat; +import org.apache.commons.collections.map.LinkedMap; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.state.CheckpointListener; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.FlinkRuntimeException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; +import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; + +/** + * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data + * from databases into Flink. + * + * <p>There are two workers during the runtime. One worker periodically pulls records from the + * database and pushes the records into the {@link Handover}. The other worker consumes the records + * from the {@link Handover} and convert the records to the data in Flink style. The reason why + * don't use one workers is because debezium has different behaviours in snapshot phase and + * streaming phase. + * + * <p>Here we use the {@link Handover} as the buffer to submit data from the producer to the + * consumer. Because the two threads don't communicate to each other directly, the error reporting + * also relies on {@link Handover}. When the engine gets errors, the engine uses the {@link + * DebeziumEngine.CompletionCallback} to report errors to the {@link Handover} and wakes up the + * consumer to check the error. However, the source function just closes the engine and wakes up the + * producer if the error is from the Flink side. + * + * <p>If the execution is canceled or finish(only snapshot phase), the exit logic is as same as the + * logic in the error reporting. + * + * <p>The source function participates in checkpointing and guarantees that no data is lost during a + * failure, and that the computation processes elements "exactly once". + * + * <p>Note: currently, the source function can't run in multiple parallel instances. + * + * <p> + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + */ +@PublicEvolving +public class DebeziumSourceFunction<T> extends RichSourceFunction<T> + implements + CheckpointedFunction, + CheckpointListener, + ResultTypeQueryable<T> { + + private static final long serialVersionUID = -5808108641062931623L; + + protected static final Logger LOG = LoggerFactory.getLogger(DebeziumSourceFunction.class); + + /** State name of the consumer's partition offset states. */ + public static final String OFFSETS_STATE_NAME = "offset-states"; + + /** State name of the consumer's history records state. */ + public static final String HISTORY_RECORDS_STATE_NAME = "history-records-states"; + + /** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */ + public static final int MAX_NUM_PENDING_CHECKPOINTS = 100; + + /** + * The configuration represents the Debezium MySQL Connector uses the legacy implementation or + * not. + */ + public static final String LEGACY_IMPLEMENTATION_KEY = "internal.implementation"; + + /** The configuration value represents legacy implementation. */ + public static final String LEGACY_IMPLEMENTATION_VALUE = "legacy"; + + // --------------------------------------------------------------------------------------- + // Properties + // --------------------------------------------------------------------------------------- + + /** The schema to convert from Debezium's messages into Flink's objects. */ + private final DebeziumDeserializationSchema<T> deserializer; + + /** User-supplied properties for Kafka. * */ + private final Properties properties; + + /** The specific binlog offset to read from when the first startup. */ + private final @Nullable DebeziumOffset specificOffset; + + /** Data for pending but uncommitted offsets. */ + private final LinkedMap pendingOffsetsToCommit = new LinkedMap(); + + /** Flag indicating whether the Debezium Engine is started. */ + private volatile boolean debeziumStarted = false; + + /** Validator to validate the connected database satisfies the cdc connector's requirements. */ + private final Validator validator; + + // --------------------------------------------------------------------------------------- + // State + // --------------------------------------------------------------------------------------- + + /** + * The offsets to restore to, if the consumer restores state from a checkpoint. + * + * <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} + * method. + * + * <p>Using a String because we are encoding the offset state in JSON bytes. + */ + private transient volatile String restoredOffsetState; + + /** Accessor for state in the operator state backend. */ + private transient ListState<byte[]> offsetState; + + /** + * State to store the history records, i.e. schema changes. + * + * @see FlinkDatabaseHistory + * @see FlinkDatabaseSchemaHistory + */ + private transient ListState<String> schemaRecordsState; + + // --------------------------------------------------------------------------------------- + // Worker + // --------------------------------------------------------------------------------------- + + private transient ExecutorService executor; + private transient DebeziumEngine<?> engine; + /** + * Unique name of this Debezium Engine instance across all the jobs. Currently we randomly + * generate a UUID for it. This is used for {@link FlinkDatabaseHistory}. + */ + private transient String engineInstanceName; + + /** Consume the events from the engine and commit the offset to the engine. */ + private transient DebeziumChangeConsumer changeConsumer; + + /** The consumer to fetch records from {@link Handover}. */ + private transient DebeziumChangeFetcher<T> debeziumChangeFetcher; + + /** Buffer the events from the source and record the errors from the debezium. */ + private transient Handover handover; + + // --------------------------------------------------------------------------------------- + + public DebeziumSourceFunction( + DebeziumDeserializationSchema<T> deserializer, + Properties properties, + @Nullable DebeziumOffset specificOffset, + Validator validator) { + this.deserializer = deserializer; + this.properties = properties; + this.specificOffset = specificOffset; + this.validator = validator; + } + + @Override + public void open(Configuration parameters) throws Exception { + validator.validate(); + super.open(parameters); + ThreadFactory threadFactory = + new ThreadFactoryBuilder().setNameFormat("debezium-engine").build(); + deserializer.open(); + this.executor = Executors.newSingleThreadExecutor(threadFactory); + this.handover = new Handover(); + this.changeConsumer = new DebeziumChangeConsumer(handover); + } + + // ------------------------------------------------------------------------ + // Checkpoint and restore + // ------------------------------------------------------------------------ + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + OperatorStateStore stateStore = context.getOperatorStateStore(); + this.offsetState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + OFFSETS_STATE_NAME, + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)); + this.schemaRecordsState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); + + if (context.isRestored()) { + restoreOffsetState(); + restoreHistoryRecordsState(); + } else { + if (specificOffset != null) { + byte[] serializedOffset = + DebeziumOffsetSerializer.INSTANCE.serialize(specificOffset); + restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8); + LOG.info( + "Consumer subtask {} starts to read from specified offset {}.", + getRuntimeContext().getIndexOfThisSubtask(), + restoredOffsetState); + } else { + LOG.info( + "Consumer subtask {} has no restore state.", + getRuntimeContext().getIndexOfThisSubtask()); + } + } + } + + private void restoreOffsetState() throws Exception { + for (byte[] serializedOffset : offsetState.get()) { + if (restoredOffsetState == null) { + restoredOffsetState = new String(serializedOffset, StandardCharsets.UTF_8); + } else { + throw new RuntimeException( + "Debezium Source only support single task, " + + "however, this is restored from multiple tasks."); + } + } + LOG.info( + "Consumer subtask {} restored offset state: {}.", + getRuntimeContext().getIndexOfThisSubtask(), + restoredOffsetState); + } + + private void restoreHistoryRecordsState() throws Exception { + DocumentReader reader = DocumentReader.defaultReader(); + ConcurrentLinkedQueue<SchemaRecord> historyRecords = new ConcurrentLinkedQueue<>(); + int recordsCount = 0; + boolean firstEntry = true; + for (String record : schemaRecordsState.get()) { + if (firstEntry) { + // we store the engine instance name in the first element + this.engineInstanceName = record; + firstEntry = false; + } else { + // Put the records into the state. The database history should read, reorganize and + // register the state. + historyRecords.add(new SchemaRecord(reader.read(record))); + recordsCount++; + } + } + if (engineInstanceName != null) { + registerHistory(engineInstanceName, historyRecords); + } + LOG.info( + "Consumer subtask {} restored history records state: {} with {} records.", + getRuntimeContext().getIndexOfThisSubtask(), + engineInstanceName, + recordsCount); + } + + @Override + public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception { + if (handover.hasError()) { + LOG.debug("snapshotState() called on closed source"); + throw new FlinkRuntimeException( + "Call snapshotState() on closed source, checkpoint failed."); + } else { + snapshotOffsetState(functionSnapshotContext.getCheckpointId()); + snapshotHistoryRecordsState(); + } + } + + private void snapshotOffsetState(long checkpointId) throws Exception { + offsetState.clear(); + + final DebeziumChangeFetcher<?> fetcher = this.debeziumChangeFetcher; + + byte[] serializedOffset = null; + if (fetcher == null) { + // the fetcher has not yet been initialized, which means we need to return the + // originally restored offsets + if (restoredOffsetState != null) { + serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8); + } + } else { + byte[] currentState = fetcher.snapshotCurrentState(); + if (currentState == null && restoredOffsetState != null) { + // the fetcher has been initialized, but has not yet received any data, + // which means we need to return the originally restored offsets. + serializedOffset = restoredOffsetState.getBytes(StandardCharsets.UTF_8); + } else { + serializedOffset = currentState; + } + } + + if (serializedOffset != null) { + offsetState.add(serializedOffset); + // the map cannot be asynchronously updated, because only one checkpoint call + // can happen on this function at a time: either snapshotState() or + // notifyCheckpointComplete() + pendingOffsetsToCommit.put(checkpointId, serializedOffset); + // truncate the map of pending offsets to commit, to prevent infinite growth + while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) { + pendingOffsetsToCommit.remove(0); + } + } + } + + private void snapshotHistoryRecordsState() throws Exception { + schemaRecordsState.clear(); + + if (engineInstanceName != null) { + schemaRecordsState.add(engineInstanceName); + Collection<SchemaRecord> records = retrieveHistory(engineInstanceName); + DocumentWriter writer = DocumentWriter.defaultWriter(); + for (SchemaRecord record : records) { + schemaRecordsState.add(writer.write(record.toDocument())); + } + } + } + + @Override + public void run(SourceContext<T> sourceContext) throws Exception { + properties.setProperty("name", "engine"); + properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); + if (restoredOffsetState != null) { + // restored from state + properties.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, restoredOffsetState); + } + // DO NOT include schema change, e.g. DDL + properties.setProperty("include.schema.changes", "false"); + // disable the offset flush totally + properties.setProperty("offset.flush.interval.ms", String.valueOf(Long.MAX_VALUE)); + // disable tombstones + properties.setProperty("tombstones.on.delete", "false"); + if (engineInstanceName == null) { + // not restore from recovery + engineInstanceName = UUID.randomUUID().toString(); + } + // history instance name to initialize FlinkDatabaseHistory + properties.setProperty( + FlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME, engineInstanceName); + // we have to use a persisted DatabaseHistory implementation, otherwise, recovery can't + // continue to read binlog + // see + // https://stackoverflow.com/questions/57147584/debezium-error-schema-isnt-know-to-this-connector + // and https://debezium.io/blog/2018/03/16/note-on-database-history-topic-configuration/ + properties.setProperty("database.history", determineDatabase().getCanonicalName()); + + // we have to filter out the heartbeat events, otherwise the deserializer will fail + String dbzHeartbeatPrefix = + properties.getProperty( + Heartbeat.HEARTBEAT_TOPICS_PREFIX.name(), + Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString()); + this.debeziumChangeFetcher = + new DebeziumChangeFetcher<>( + sourceContext, + deserializer, + restoredOffsetState == null, // DB snapshot phase if restore state is null + dbzHeartbeatPrefix, + handover); + + // create the engine with this configuration ... + this.engine = + DebeziumEngine.create(Connect.class) + .using(properties) + .notifying(changeConsumer) + .using(OffsetCommitPolicy.always()) + .using( + (success, message, error) -> { + if (success) { + // Close the handover and prepare to exit. + handover.close(); + } else { + handover.reportError(error); + } + }) + .build(); + + // run the engine asynchronously + executor.execute(engine); + debeziumStarted = true; + + // initialize metrics + // make RuntimeContext#getMetricGroup compatible between Flink 1.13 and Flink 1.14 + final Method getMetricGroupMethod = + getRuntimeContext().getClass().getMethod("getMetricGroup"); + getMetricGroupMethod.setAccessible(true); + final MetricGroup metricGroup = + (MetricGroup) getMetricGroupMethod.invoke(getRuntimeContext()); + + metricGroup.gauge( + "currentFetchEventTimeLag", + (Gauge<Long>) () -> debeziumChangeFetcher.getFetchDelay()); + metricGroup.gauge( + "currentEmitEventTimeLag", + (Gauge<Long>) () -> debeziumChangeFetcher.getEmitDelay()); + metricGroup.gauge( + "sourceIdleTime", (Gauge<Long>) () -> debeziumChangeFetcher.getIdleTime()); + + // start the real debezium consumer + debeziumChangeFetcher.runFetchLoop(); + } + + @Override + public void notifyCheckpointComplete(long checkpointId) { + if (!debeziumStarted) { + LOG.debug("notifyCheckpointComplete() called when engine is not started."); + return; + } + + final DebeziumChangeFetcher<T> fetcher = this.debeziumChangeFetcher; + if (fetcher == null) { + LOG.debug("notifyCheckpointComplete() called on uninitialized source"); + return; + } + + try { + final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn( + "Consumer subtask {} received confirmation for unknown checkpoint id {}", + getRuntimeContext().getIndexOfThisSubtask(), + checkpointId); + return; + } + + byte[] serializedOffsets = (byte[]) pendingOffsetsToCommit.remove(posInMap); + + // remove older checkpoints in map + for (int i = 0; i < posInMap; i++) { + pendingOffsetsToCommit.remove(0); + } + + if (serializedOffsets == null || serializedOffsets.length == 0) { + LOG.debug( + "Consumer subtask {} has empty checkpoint state.", + getRuntimeContext().getIndexOfThisSubtask()); + return; + } + + DebeziumOffset offset = + DebeziumOffsetSerializer.INSTANCE.deserialize(serializedOffsets); + changeConsumer.commitOffset(offset); + } catch (Exception e) { + // ignore exception if we are no longer running + LOG.warn("Ignore error when committing offset to database.", e); + } + } + + @Override + public void cancel() { + // safely and gracefully stop the engine + shutdownEngine(); + if (debeziumChangeFetcher != null) { + debeziumChangeFetcher.close(); + } + } + + @Override + public void close() throws Exception { + cancel(); + + if (executor != null) { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); + } + + super.close(); + } + + /** Safely and gracefully stop the Debezium engine. */ + private void shutdownEngine() { + try { + if (engine != null) { + engine.close(); + } + } catch (IOException e) { + ExceptionUtils.rethrow(e); + } finally { + if (executor != null) { + executor.shutdownNow(); + } + + debeziumStarted = false; + + if (handover != null) { + handover.close(); + } + } + } + + @Override + public TypeInformation<T> getProducedType() { + return deserializer.getProducedType(); + } + + @VisibleForTesting + public LinkedMap getPendingOffsetsToCommit() { + return pendingOffsetsToCommit; + } + + @VisibleForTesting + public boolean getDebeziumStarted() { + return debeziumStarted; + } + + private Class<?> determineDatabase() { + boolean isCompatibleWithLegacy = + FlinkDatabaseHistory.isCompatible(retrieveHistory(engineInstanceName)); + if (LEGACY_IMPLEMENTATION_VALUE.equals(properties.get(LEGACY_IMPLEMENTATION_KEY))) { + // specifies the legacy implementation but the state may be incompatible + if (isCompatibleWithLegacy) { + return FlinkDatabaseHistory.class; + } else { + throw new IllegalStateException( + "The configured option 'debezium.internal.implementation' is 'legacy', but the state of source is incompatible with this implementation, you should remove the the option."); + } + } else if (FlinkDatabaseSchemaHistory.isCompatible(retrieveHistory(engineInstanceName))) { + // tries the non-legacy first + return FlinkDatabaseSchemaHistory.class; + } else if (isCompatibleWithLegacy) { + // fallback to legacy if possible + return FlinkDatabaseHistory.class; + } else { + // impossible + throw new IllegalStateException("Can't determine which DatabaseHistory to use."); + } + } + + @VisibleForTesting + public String getEngineInstanceName() { + return engineInstanceName; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java new file mode 100644 index 0000000000..040541b827 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.postgre; + +import com.ververica.cdc.debezium.Validator; +import io.debezium.connector.postgresql.PostgresConnector; + +import java.time.Duration; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A builder to build a SourceFunction which can read snapshot and continue to consume binlog for + * PostgreSQL. + * <p> + * Copy from com.ververica:flink-connector-postgres-cdc-2.3.0 + * */ +public class PostgreSQLSource { + + private static final long DEFAULT_HEARTBEAT_MS = Duration.ofMinutes(5).toMillis(); + + public static <T> Builder<T> builder() { + return new Builder<>(); + } + + /** Builder class of {@link PostgreSQLSource}. */ + public static class Builder<T> { + + private String pluginName = "decoderbufs"; + private String slotName = "flink"; + private int port = 5432; // default 5432 port + private String hostname; + private String database; + private String username; + private String password; + private String[] schemaList; + private String[] tableList; + private Properties dbzProperties; + private DebeziumDeserializationSchema<T> deserializer; + + /** + * The name of the Postgres logical decoding plug-in installed on the server. Supported + * values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming, + * wal2json_rds_streaming and pgoutput. + */ + public Builder<T> decodingPluginName(String name) { + this.pluginName = name; + return this; + } + + public Builder<T> hostname(String hostname) { + this.hostname = hostname; + return this; + } + + /** Integer port number of the PostgreSQL database server. */ + public Builder<T> port(int port) { + this.port = port; + return this; + } + + /** The name of the PostgreSQL database from which to stream the changes. */ + public Builder<T> database(String database) { + this.database = database; + return this; + } + + /** + * An optional list of regular expressions that match schema names to be monitored; any + * schema name not included in the whitelist will be excluded from monitoring. By default + * all non-system schemas will be monitored. + */ + public Builder<T> schemaList(String... schemaList) { + this.schemaList = schemaList; + return this; + } + + /** + * An optional list of regular expressions that match fully-qualified table identifiers for + * tables to be monitored; any table not included in the whitelist will be excluded from + * monitoring. Each identifier is of the form schemaName.tableName. By default the connector + * will monitor every non-system table in each monitored schema. + */ + public Builder<T> tableList(String... tableList) { + this.tableList = tableList; + return this; + } + + /** + * Name of the PostgreSQL database to use when connecting to the PostgreSQL database server. + */ + public Builder<T> username(String username) { + this.username = username; + return this; + } + + /** Password to use when connecting to the PostgreSQL database server. */ + public Builder<T> password(String password) { + this.password = password; + return this; + } + + /** + * The name of the PostgreSQL logical decoding slot that was created for streaming changes + * from a particular plug-in for a particular database/schema. The server uses this slot to + * stream events to the connector that you are configuring. Default is "flink". + * + * <p>Slot names must conform to <a + * href="https://www.postgresql.org/docs/current/static/warm-standby.html#STREAMING-REPLICATION-SLOTS-MANIPULATION">PostgreSQL + * replication slot naming rules</a>, which state: "Each replication slot has a name, which + * can contain lower-case letters, numbers, and the underscore character." + */ + public Builder<T> slotName(String slotName) { + this.slotName = slotName; + return this; + } + + /** The Debezium Postgres connector properties. */ + public Builder<T> debeziumProperties(Properties properties) { + this.dbzProperties = properties; + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) { + this.deserializer = deserializer; + return this; + } + + public DebeziumSourceFunction<T> build() { + Properties props = new Properties(); + props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); + props.setProperty("plugin.name", pluginName); + // hard code server name, because we don't need to distinguish it, docs: + // Logical name that identifies and provides a namespace for the particular PostgreSQL + // database server/cluster being monitored. The logical name should be unique across + // all other connectors, since it is used as a prefix for all Kafka topic names coming + // from this connector. Only alphanumeric characters and underscores should be used. + props.setProperty("database.server.name", "postgres_cdc_source"); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.dbname", checkNotNull(database)); + props.setProperty("database.user", checkNotNull(username)); + props.setProperty("database.password", checkNotNull(password)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("slot.name", slotName); + // we have to enable heartbeat for PG to make sure DebeziumChangeConsumer#handleBatch + // is invoked after job restart + props.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_MS)); + + if (schemaList != null) { + props.setProperty("schema.whitelist", String.join(",", schemaList)); + } + if (tableList != null) { + props.setProperty("table.whitelist", String.join(",", tableList)); + } + + if (dbzProperties != null) { + props.putAll(dbzProperties); + } + + return new DebeziumSourceFunction<>( + deserializer, props, null, Validator.getDefaultValidator()); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java index 22fb76a4e1..6e4bd7c922 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java @@ -18,13 +18,9 @@ package org.apache.inlong.sort.postgre; import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.SourceMetricData; -import com.ververica.cdc.connectors.postgres.PostgreSQLSource; import com.ververica.cdc.connectors.postgres.table.PostgreSQLDeserializationConverterFactory; import com.ververica.cdc.connectors.postgres.table.PostgreSQLReadableMetadata; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; -import com.ververica.cdc.debezium.DebeziumSourceFunction; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; import com.ververica.cdc.debezium.table.MetadataConverter; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -139,7 +135,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe PostgreSQLDeserializationConverterFactory.instance()) .setValueValidator(new PostgresValueValidator(schemaName, tableName)) .setChangelogMode(changelogMode) - .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption)) + .setMetricOption(metricOption) .build(); DebeziumSourceFunction<RowData> sourceFunction = PostgreSQLSource.<RowData>builder() @@ -237,7 +233,8 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe dbzProperties, producedDataType, metadataKeys, - changelogMode); + changelogMode, + metricOption); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java index c6cf4e0d54..de701a2ae1 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java @@ -17,10 +17,10 @@ package org.apache.inlong.sort.postgre; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceMetricData; -import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import com.ververica.cdc.debezium.table.AppendMetadataCollector; import com.ververica.cdc.debezium.table.DebeziumChangelogMode; import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; @@ -100,7 +100,8 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali /** Changelog Mode to use for encoding changes in Flink internal data structure. */ private final DebeziumChangelogMode changelogMode; - private final SourceMetricData sourceMetricData; + private final MetricOption metricOption; + private SourceMetricData sourceMetricData; /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ public static Builder newBuilder() { @@ -115,7 +116,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali ZoneId serverTimeZone, DeserializationRuntimeConverterFactory userDefinedConverterFactory, DebeziumChangelogMode changelogMode, - SourceMetricData sourceMetricData) { + MetricOption metricOption) { this.hasMetadata = checkNotNull(metadataConverters).length > 0; this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); this.physicalConverter = @@ -126,7 +127,14 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali this.resultTypeInfo = checkNotNull(resultTypeInfo); this.validator = checkNotNull(validator); this.changelogMode = checkNotNull(changelogMode); - this.sourceMetricData = sourceMetricData; + this.metricOption = metricOption; + } + + @Override + public void open() { + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } } @Override @@ -146,6 +154,9 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali GenericRowData delete = extractBeforeRow(value, valueSchema); validator.validate(delete, RowKind.DELETE); delete.setRowKind(RowKind.DELETE); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } emit(record, delete, out); } else { if (changelogMode == DebeziumChangelogMode.ALL) { @@ -208,7 +219,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali private DeserializationRuntimeConverterFactory userDefinedConverterFactory = DeserializationRuntimeConverterFactory.DEFAULT; private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; - private SourceMetricData sourceMetricData; + private MetricOption metricOption; public Builder setPhysicalRowType(RowType physicalRowType) { this.physicalRowType = physicalRowType; @@ -240,8 +251,8 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali this.changelogMode = changelogMode; return this; } - public Builder setSourceMetricData(SourceMetricData sourceMetricData) { - this.sourceMetricData = sourceMetricData; + public Builder setMetricOption(MetricOption metricOption) { + this.metricOption = metricOption; return this; } @@ -254,7 +265,7 @@ public final class RowDataDebeziumDeserializeSchema implements DebeziumDeseriali serverTimeZone, userDefinedConverterFactory, changelogMode, - sourceMetricData); + metricOption); } } diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 262d48fce0..261b1c9f97 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -862,10 +862,14 @@ Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE -1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java - inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java +1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumChangeFetcher.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumDeserializationSchema.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/DebeziumSourceFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java Source : com.ververica:flink-connector-postgres-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE