This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 2acf82d174 [INLONG-10238][Sort] MySQL connector support audit ID (#10239) 2acf82d174 is described below commit 2acf82d174c6358447422006f271785d9c59e2ab Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed May 22 09:46:55 2024 +0800 [INLONG-10238][Sort] MySQL connector support audit ID (#10239) --- .../apache/inlong/sort/mysql/MySqlTableSource.java | 372 +++++++++++ .../inlong/sort/mysql/MysqlTableFactory.java | 18 +- .../mysql/RowDataDebeziumDeserializeSchema.java | 687 +++++++++++++++++++++ licenses/inlong-sort-connectors/LICENSE | 14 +- 4 files changed, 1082 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java new file mode 100644 index 0000000000..c244378aca --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.ververica.cdc.connectors.mysql.source.MySqlSource; +import com.ververica.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory; +import com.ververica.cdc.connectors.mysql.table.MySqlReadableMetadata; +import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.MetadataConverter; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.time.ZoneId; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSource} that describes how to create a MySQL binlog source from a logical + * description. + * <p> + * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0 + */ +public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadata { + + private final ResolvedSchema physicalSchema; + private final int port; + private final String hostname; + private final String database; + private final String username; + private final String password; + private final String serverId; + private final String tableName; + private final ZoneId serverTimeZone; + private final Properties dbzProperties; + private final boolean enableParallelRead; + private final int splitSize; + private final int splitMetaGroupSize; + private final int fetchSize; + private final Duration connectTimeout; + private final int connectionPoolSize; + private final int connectMaxRetries; + private final double distributionFactorUpper; + private final double distributionFactorLower; + private final StartupOptions startupOptions; + private final boolean scanNewlyAddedTableEnabled; + private final Properties jdbcProperties; + private final Duration heartbeatInterval; + private final String chunkKeyColumn; + + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + /** Data type that describes the final output of the source. */ + protected DataType producedDataType; + + /** Metadata that is appended at the end of a physical source row. */ + protected List<String> metadataKeys; + + private final MetricOption metricOption; + + public MySqlTableSource( + ResolvedSchema physicalSchema, + int port, + String hostname, + String database, + String tableName, + String username, + String password, + ZoneId serverTimeZone, + Properties dbzProperties, + @Nullable String serverId, + boolean enableParallelRead, + int splitSize, + int splitMetaGroupSize, + int fetchSize, + Duration connectTimeout, + int connectMaxRetries, + int connectionPoolSize, + double distributionFactorUpper, + double distributionFactorLower, + StartupOptions startupOptions, + boolean scanNewlyAddedTableEnabled, + Properties jdbcProperties, + Duration heartbeatInterval, + @Nullable String chunkKeyColumn, + MetricOption metricOption) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = checkNotNull(hostname); + this.database = checkNotNull(database); + this.tableName = checkNotNull(tableName); + this.username = checkNotNull(username); + this.password = checkNotNull(password); + this.serverId = serverId; + this.serverTimeZone = serverTimeZone; + this.dbzProperties = dbzProperties; + this.enableParallelRead = enableParallelRead; + this.splitSize = splitSize; + this.splitMetaGroupSize = splitMetaGroupSize; + this.fetchSize = fetchSize; + this.connectTimeout = connectTimeout; + this.connectMaxRetries = connectMaxRetries; + this.connectionPoolSize = connectionPoolSize; + this.distributionFactorUpper = distributionFactorUpper; + this.distributionFactorLower = distributionFactorLower; + this.startupOptions = startupOptions; + this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled; + this.jdbcProperties = jdbcProperties; + // Mutable attributes + this.producedDataType = physicalSchema.toPhysicalRowDataType(); + this.metadataKeys = Collections.emptyList(); + this.heartbeatInterval = heartbeatInterval; + this.chunkKeyColumn = chunkKeyColumn; + this.metricOption = metricOption; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + MetadataConverter[] metadataConverters = getMetadataConverters(); + final TypeInformation<RowData> typeInfo = + scanContext.createTypeInformation(producedDataType); + + DebeziumDeserializationSchema<RowData> deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setMetadataConverters(metadataConverters) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(serverTimeZone) + .setUserDefinedConverterFactory( + MySqlDeserializationConverterFactory.instance()) + .setSourceMetricData(metricOption == null ? null : new SourceMetricData(metricOption)) + .build(); + if (enableParallelRead) { + MySqlSource<RowData> parallelSource = + MySqlSource.<RowData>builder() + .hostname(hostname) + .port(port) + .databaseList(database) + .tableList(database + "." + tableName) + .username(username) + .password(password) + .serverTimeZone(serverTimeZone.toString()) + .serverId(serverId) + .splitSize(splitSize) + .splitMetaGroupSize(splitMetaGroupSize) + .distributionFactorUpper(distributionFactorUpper) + .distributionFactorLower(distributionFactorLower) + .fetchSize(fetchSize) + .connectTimeout(connectTimeout) + .connectMaxRetries(connectMaxRetries) + .connectionPoolSize(connectionPoolSize) + .debeziumProperties(dbzProperties) + .startupOptions(startupOptions) + .deserializer(deserializer) + .scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled) + .jdbcProperties(jdbcProperties) + .heartbeatInterval(heartbeatInterval) + .chunkKeyColumn(chunkKeyColumn) + .build(); + return SourceProvider.of(parallelSource); + } else { + com.ververica.cdc.connectors.mysql.MySqlSource.Builder<RowData> builder = + com.ververica.cdc.connectors.mysql.MySqlSource.<RowData>builder() + .hostname(hostname) + .port(port) + .databaseList(database) + .tableList(database + "." + tableName) + .username(username) + .password(password) + .serverTimeZone(serverTimeZone.toString()) + .debeziumProperties(dbzProperties) + .startupOptions(startupOptions) + .deserializer(deserializer); + Optional.ofNullable(serverId) + .ifPresent(serverId -> builder.serverId(Integer.parseInt(serverId))); + DebeziumSourceFunction<RowData> sourceFunction = builder.build(); + return SourceFunctionProvider.of(sourceFunction, false); + } + } + + protected MetadataConverter[] getMetadataConverters() { + if (metadataKeys.isEmpty()) { + return new MetadataConverter[0]; + } + + return metadataKeys.stream() + .map( + key -> Stream.of(MySqlReadableMetadata.values()) + .filter(m -> m.getKey().equals(key)) + .findFirst() + .orElseThrow(IllegalStateException::new)) + .map(MySqlReadableMetadata::getConverter) + .toArray(MetadataConverter[]::new); + } + + @Override + public Map<String, DataType> listReadableMetadata() { + return Stream.of(MySqlReadableMetadata.values()) + .collect( + Collectors.toMap( + MySqlReadableMetadata::getKey, MySqlReadableMetadata::getDataType)); + } + + @Override + public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) { + this.metadataKeys = metadataKeys; + this.producedDataType = producedDataType; + } + + @Override + public DynamicTableSource copy() { + MySqlTableSource source = + new MySqlTableSource( + physicalSchema, + port, + hostname, + database, + tableName, + username, + password, + serverTimeZone, + dbzProperties, + serverId, + enableParallelRead, + splitSize, + splitMetaGroupSize, + fetchSize, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + distributionFactorUpper, + distributionFactorLower, + startupOptions, + scanNewlyAddedTableEnabled, + jdbcProperties, + heartbeatInterval, + chunkKeyColumn, + metricOption); + source.metadataKeys = metadataKeys; + source.producedDataType = producedDataType; + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof MySqlTableSource)) { + return false; + } + MySqlTableSource that = (MySqlTableSource) o; + return port == that.port + && enableParallelRead == that.enableParallelRead + && splitSize == that.splitSize + && splitMetaGroupSize == that.splitMetaGroupSize + && fetchSize == that.fetchSize + && distributionFactorUpper == that.distributionFactorUpper + && distributionFactorLower == that.distributionFactorLower + && scanNewlyAddedTableEnabled == that.scanNewlyAddedTableEnabled + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(hostname, that.hostname) + && Objects.equals(database, that.database) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(serverId, that.serverId) + && Objects.equals(tableName, that.tableName) + && Objects.equals(serverTimeZone, that.serverTimeZone) + && Objects.equals(dbzProperties, that.dbzProperties) + && Objects.equals(connectTimeout, that.connectTimeout) + && Objects.equals(connectMaxRetries, that.connectMaxRetries) + && Objects.equals(connectionPoolSize, that.connectionPoolSize) + && Objects.equals(startupOptions, that.startupOptions) + && Objects.equals(producedDataType, that.producedDataType) + && Objects.equals(metadataKeys, that.metadataKeys) + && Objects.equals(jdbcProperties, that.jdbcProperties) + && Objects.equals(heartbeatInterval, that.heartbeatInterval) + && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) + && Objects.equals(metricOption, that.metricOption); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + port, + hostname, + database, + username, + password, + serverId, + tableName, + serverTimeZone, + dbzProperties, + enableParallelRead, + splitSize, + splitMetaGroupSize, + fetchSize, + connectTimeout, + connectMaxRetries, + connectionPoolSize, + distributionFactorUpper, + distributionFactorLower, + startupOptions, + producedDataType, + metadataKeys, + scanNewlyAddedTableEnabled, + jdbcProperties, + heartbeatInterval, + chunkKeyColumn, + metricOption); + } + + @Override + public String asSummaryString() { + return "MySQL-CDC"; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java index c684cc3ae4..f903780a36 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java @@ -17,11 +17,12 @@ package org.apache.inlong.sort.mysql; +import org.apache.inlong.sort.base.metric.MetricOption; + import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions; import com.ververica.cdc.connectors.mysql.source.config.ServerIdRange; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; -import com.ververica.cdc.connectors.mysql.table.MySqlTableSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.annotation.Experimental; import org.apache.flink.configuration.ConfigOption; @@ -48,7 +49,8 @@ import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProper import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY; -import static org.apache.inlong.sort.base.Constants.*; +import static org.apache.inlong.sort.base.Constants.GH_OST_DDL_CHANGE; +import static org.apache.inlong.sort.base.Constants.GH_OST_TABLE_REGEX; public class MysqlTableFactory implements DynamicTableSourceFactory { @@ -97,6 +99,15 @@ public class MysqlTableFactory implements DynamicTableSourceFactory { validateDistributionFactorLower(distributionFactorLower); } + String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = config.get(INLONG_AUDIT); + String auditKeys = config.get(AUDIT_KEYS); + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + return new MySqlTableSource(physicalSchema, port, hostname, @@ -120,7 +131,8 @@ public class MysqlTableFactory implements DynamicTableSourceFactory { scanNewlyAddedTableEnabled, getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, - chunkKeyColumn); + chunkKeyColumn, + metricOption); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java new file mode 100644 index 0000000000..e98a700b62 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.mysql; + +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.table.AppendMetadataCollector; +import com.ververica.cdc.debezium.table.DebeziumChangelogMode; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverter; +import com.ververica.cdc.debezium.table.DeserializationRuntimeConverterFactory; +import com.ververica.cdc.debezium.table.MetadataConverter; +import com.ververica.cdc.debezium.utils.TemporalConversions; +import io.debezium.data.Envelope; +import io.debezium.data.SpecialValueDecimal; +import io.debezium.data.VariableScaleDecimal; +import io.debezium.time.MicroTime; +import io.debezium.time.MicroTimestamp; +import io.debezium.time.NanoTime; +import io.debezium.time.NanoTimestamp; +import io.debezium.time.Timestamp; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Deserialization schema from Debezium object to Flink Table/SQL internal data structure {@link + * RowData}. + * <p> + * Copy from com.ververica:flink-connector-mysql-cdc-2.3.0 + */ +public final class RowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema<RowData> { + + private final static Logger LOG = LoggerFactory.getLogger(RowDataDebeziumDeserializeSchema.class); + + private static final long serialVersionUID = 2L; + + /** Custom validator to validate the row value. */ + public interface ValueValidator extends Serializable { + + void validate(RowData rowData, RowKind rowKind) throws Exception; + } + + /** TypeInformation of the produced {@link RowData}. * */ + private final TypeInformation<RowData> resultTypeInfo; + + /** + * Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of + * physical column values. + */ + private final DeserializationRuntimeConverter physicalConverter; + + /** Whether the deserializer needs to handle metadata columns. */ + private final boolean hasMetadata; + + /** + * A wrapped output collector which is used to append metadata columns after physical columns. + */ + private final AppendMetadataCollector appendMetadataCollector; + + /** Validator to validate the row value. */ + private final ValueValidator validator; + + /** Changelog Mode to use for encoding changes in Flink internal data structure. */ + private final DebeziumChangelogMode changelogMode; + private final SourceMetricData sourceMetricData; + + /** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */ + public static Builder newBuilder() { + return new Builder(); + } + + RowDataDebeziumDeserializeSchema( + RowType physicalDataType, + MetadataConverter[] metadataConverters, + TypeInformation<RowData> resultTypeInfo, + ValueValidator validator, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory, + DebeziumChangelogMode changelogMode, + SourceMetricData sourceMetricData) { + this.hasMetadata = checkNotNull(metadataConverters).length > 0; + this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters); + this.physicalConverter = + createConverter( + checkNotNull(physicalDataType), + serverTimeZone, + userDefinedConverterFactory); + this.resultTypeInfo = checkNotNull(resultTypeInfo); + this.validator = checkNotNull(validator); + this.changelogMode = checkNotNull(changelogMode); + this.sourceMetricData = sourceMetricData; + } + + @Override + public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception { + Envelope.Operation op = Envelope.operationFor(record); + Struct value = (Struct) record.value(); + Schema valueSchema = record.valueSchema(); + if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) { + GenericRowData insert = extractAfterRow(value, valueSchema); + validator.validate(insert, RowKind.INSERT); + insert.setRowKind(RowKind.INSERT); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } + emit(record, insert, out); + } else if (op == Envelope.Operation.DELETE) { + GenericRowData delete = extractBeforeRow(value, valueSchema); + validator.validate(delete, RowKind.DELETE); + delete.setRowKind(RowKind.DELETE); + emit(record, delete, out); + } else { + if (changelogMode == DebeziumChangelogMode.ALL) { + GenericRowData before = extractBeforeRow(value, valueSchema); + validator.validate(before, RowKind.UPDATE_BEFORE); + before.setRowKind(RowKind.UPDATE_BEFORE); + emit(record, before, out); + } + + GenericRowData after = extractAfterRow(value, valueSchema); + validator.validate(after, RowKind.UPDATE_AFTER); + after.setRowKind(RowKind.UPDATE_AFTER); + if (sourceMetricData != null) { + out = new MetricsCollector<>(out, sourceMetricData); + } + emit(record, after, out); + } + } + + private GenericRowData extractAfterRow(Struct value, Schema valueSchema) throws Exception { + Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema(); + Struct after = value.getStruct(Envelope.FieldName.AFTER); + return (GenericRowData) physicalConverter.convert(after, afterSchema); + } + + private GenericRowData extractBeforeRow(Struct value, Schema valueSchema) throws Exception { + Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema(); + Struct before = value.getStruct(Envelope.FieldName.BEFORE); + return (GenericRowData) physicalConverter.convert(before, beforeSchema); + } + + private void emit(SourceRecord inRecord, RowData physicalRow, Collector<RowData> collector) { + if (!hasMetadata) { + collector.collect(physicalRow); + return; + } + appendMetadataCollector.inputRecord = inRecord; + appendMetadataCollector.outputCollector = collector; + appendMetadataCollector.collect(physicalRow); + } + + @Override + public TypeInformation<RowData> getProducedType() { + return resultTypeInfo; + } + + // ------------------------------------------------------------------------------------- + // Builder + // ------------------------------------------------------------------------------------- + + /** Builder of {@link RowDataDebeziumDeserializeSchema}. */ + public static class Builder { + + private RowType physicalRowType; + private TypeInformation<RowData> resultTypeInfo; + private MetadataConverter[] metadataConverters = new MetadataConverter[0]; + private ValueValidator validator = (rowData, rowKind) -> { + }; + private ZoneId serverTimeZone = ZoneId.of("UTC"); + private DeserializationRuntimeConverterFactory userDefinedConverterFactory = + DeserializationRuntimeConverterFactory.DEFAULT; + private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL; + private SourceMetricData sourceMetricData; + + public Builder setPhysicalRowType(RowType physicalRowType) { + this.physicalRowType = physicalRowType; + return this; + } + + public Builder setMetadataConverters(MetadataConverter[] metadataConverters) { + this.metadataConverters = metadataConverters; + return this; + } + + public Builder setResultTypeInfo(TypeInformation<RowData> resultTypeInfo) { + this.resultTypeInfo = resultTypeInfo; + return this; + } + + public Builder setValueValidator(ValueValidator validator) { + this.validator = validator; + return this; + } + + public Builder setServerTimeZone(ZoneId serverTimeZone) { + this.serverTimeZone = serverTimeZone; + return this; + } + + public Builder setUserDefinedConverterFactory( + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + this.userDefinedConverterFactory = userDefinedConverterFactory; + return this; + } + + public Builder setChangelogMode(DebeziumChangelogMode changelogMode) { + this.changelogMode = changelogMode; + return this; + } + public Builder setSourceMetricData(SourceMetricData sourceMetricData) { + this.sourceMetricData = sourceMetricData; + return this; + } + + public RowDataDebeziumDeserializeSchema build() { + return new RowDataDebeziumDeserializeSchema( + physicalRowType, + metadataConverters, + resultTypeInfo, + validator, + serverTimeZone, + userDefinedConverterFactory, + changelogMode, + sourceMetricData); + } + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + /** Creates a runtime converter which is null safe. */ + private static DeserializationRuntimeConverter createConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + return wrapIntoNullableConverter( + createNotNullConverter(type, serverTimeZone, userDefinedConverterFactory)); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). + // -------------------------------------------------------------------------------- + + /** Creates a runtime converter which assuming input object is not null. */ + public static DeserializationRuntimeConverter createNotNullConverter( + LogicalType type, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + // user defined converter has a higher resolve order + Optional<DeserializationRuntimeConverter> converter = + userDefinedConverterFactory.createUserDefinedConverter(type, serverTimeZone); + if (converter.isPresent()) { + return converter.get(); + } + + // if no matched user defined converter, fallback to the default converter + switch (type.getTypeRoot()) { + case NULL: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return null; + } + }; + case BOOLEAN: + return convertToBoolean(); + case TINYINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Byte.parseByte(dbzObj.toString()); + } + }; + case SMALLINT: + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return Short.parseShort(dbzObj.toString()); + } + }; + case INTEGER: + case INTERVAL_YEAR_MONTH: + return convertToInt(); + case BIGINT: + case INTERVAL_DAY_TIME: + return convertToLong(); + case DATE: + return convertToDate(); + case TIME_WITHOUT_TIME_ZONE: + return convertToTime(); + case TIMESTAMP_WITHOUT_TIME_ZONE: + return convertToTimestamp(serverTimeZone); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + return convertToLocalTimeZoneTimestamp(serverTimeZone); + case FLOAT: + return convertToFloat(); + case DOUBLE: + return convertToDouble(); + case CHAR: + case VARCHAR: + return convertToString(); + case BINARY: + case VARBINARY: + return convertToBinary(); + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ROW: + return createRowConverter( + (RowType) type, serverTimeZone, userDefinedConverterFactory); + case ARRAY: + case MAP: + case MULTISET: + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static DeserializationRuntimeConverter convertToBoolean() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Boolean) { + return dbzObj; + } else if (dbzObj instanceof Byte) { + return (byte) dbzObj == 1; + } else if (dbzObj instanceof Short) { + return (short) dbzObj == 1; + } else { + return Boolean.parseBoolean(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToInt() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return dbzObj; + } else if (dbzObj instanceof Long) { + return ((Long) dbzObj).intValue(); + } else { + return Integer.parseInt(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToLong() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Integer) { + return ((Integer) dbzObj).longValue(); + } else if (dbzObj instanceof Long) { + return dbzObj; + } else { + return Long.parseLong(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDouble() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return ((Float) dbzObj).doubleValue(); + } else if (dbzObj instanceof Double) { + return dbzObj; + } else { + return Double.parseDouble(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToFloat() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Float) { + return dbzObj; + } else if (dbzObj instanceof Double) { + return ((Double) dbzObj).floatValue(); + } else { + return Float.parseFloat(dbzObj.toString()); + } + } + }; + } + + private static DeserializationRuntimeConverter convertToDate() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay(); + } + }; + } + + private static DeserializationRuntimeConverter convertToTime() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case MicroTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000); + case NanoTime.SCHEMA_NAME: + return (int) ((long) dbzObj / 1000_000); + } + } else if (dbzObj instanceof Integer) { + return dbzObj; + } + // get number of milliseconds of the day + return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000; + } + }; + } + + private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof Long) { + switch (schema.name()) { + case Timestamp.SCHEMA_NAME: + return TimestampData.fromEpochMillis((Long) dbzObj); + case MicroTimestamp.SCHEMA_NAME: + long micro = (long) dbzObj; + return TimestampData.fromEpochMillis( + micro / 1000, (int) (micro % 1000 * 1000)); + case NanoTimestamp.SCHEMA_NAME: + long nano = (long) dbzObj; + return TimestampData.fromEpochMillis( + nano / 1000_000, (int) (nano % 1000_000)); + } + } + LocalDateTime localDateTime = + TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone); + return TimestampData.fromLocalDateTime(localDateTime); + } + }; + } + + private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp( + ZoneId serverTimeZone) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof String) { + String str = (String) dbzObj; + // TIMESTAMP_LTZ type is encoded in string type + Instant instant = Instant.parse(str); + return TimestampData.fromLocalDateTime( + LocalDateTime.ofInstant(instant, serverTimeZone)); + } + throw new IllegalArgumentException( + "Unable to convert to TimestampData from unexpected value '" + + dbzObj + + "' of type " + + dbzObj.getClass().getName()); + } + }; + } + + private static DeserializationRuntimeConverter convertToString() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + return StringData.fromString(dbzObj.toString()); + } + }; + } + + private static DeserializationRuntimeConverter convertToBinary() { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + if (dbzObj instanceof byte[]) { + return dbzObj; + } else if (dbzObj instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) dbzObj; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + throw new UnsupportedOperationException( + "Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName()); + } + } + }; + } + + private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) { + BigDecimal bigDecimal; + if (dbzObj instanceof byte[]) { + // decimal.handling.mode=precise + bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj); + } else if (dbzObj instanceof String) { + // decimal.handling.mode=string + bigDecimal = new BigDecimal((String) dbzObj); + } else if (dbzObj instanceof Double) { + // decimal.handling.mode=double + bigDecimal = BigDecimal.valueOf((Double) dbzObj); + } else { + if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) { + SpecialValueDecimal decimal = + VariableScaleDecimal.toLogical((Struct) dbzObj); + bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO); + } else { + // fallback to string + bigDecimal = new BigDecimal(dbzObj.toString()); + } + } + return DecimalData.fromBigDecimal(bigDecimal, precision, scale); + } + }; + } + + private static DeserializationRuntimeConverter createRowConverter( + RowType rowType, + ZoneId serverTimeZone, + DeserializationRuntimeConverterFactory userDefinedConverterFactory) { + final DeserializationRuntimeConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map( + logicType -> createConverter( + logicType, + serverTimeZone, + userDefinedConverterFactory)) + .toArray(DeserializationRuntimeConverter[]::new); + final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]); + + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + Struct struct = (Struct) dbzObj; + int arity = fieldNames.length; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; i++) { + String fieldName = fieldNames[i]; + Field field = schema.field(fieldName); + if (field == null) { + row.setField(i, null); + } else { + Object fieldValue = struct.getWithoutDefault(fieldName); + Schema fieldSchema = schema.field(fieldName).schema(); + Object convertedField = + convertField(fieldConverters[i], fieldValue, fieldSchema); + row.setField(i, convertedField); + } + } + return row; + } + }; + } + + private static Object convertField( + DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema) + throws Exception { + if (fieldValue == null) { + return null; + } else { + return fieldConverter.convert(fieldValue, fieldSchema); + } + } + + private static DeserializationRuntimeConverter wrapIntoNullableConverter( + DeserializationRuntimeConverter converter) { + return new DeserializationRuntimeConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Object dbzObj, Schema schema) throws Exception { + if (dbzObj == null) { + return null; + } + return converter.convert(dbzObj, schema); + } + }; + } +} diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 3546604c06..e3a995cc65 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -857,25 +857,27 @@ Source : org.apache.flink:flink-connector-pulsar:4.1.0-1.18 (Please note that the software have been modified.) License : https://github.com/apache/flink-connector-pulsar/blob/main/LICENSE - 1.3.23 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodb/MongoDBConnectorDeserializationSchema.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/mongodbMongoDBTableSource.java Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE - 1.3.24 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableSource.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgresValueValidator.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/RowDataDebeziumDeserializeSchema.java -Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) +Source : com.ververica:flink-connector-postgres-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE - - 1.3.25 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/RowDataDebeziumDeserializeSchema.java inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/sqlserver-cdc/src/main/java/org/apache/inlong/sort/sqlserver/SqlServerTableSource.java -Source : com.ververica:flink-connector-mongodb-cdc:2.3.0 (Please note that the software have been modified.) +Source : com.ververica:flink-connector-sqlserver-cdc:2.3.0 (Please note that the software have been modified.) +License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE + +1.3.27 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MySqlTableSource.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java +Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE