This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ec6a8dadd6 [INLONG-9223][Sort] TubeMQ source support InlongAudit (#9236) ec6a8dadd6 is described below commit ec6a8dadd698d0733f545ea0d1cbee196b549c92 Author: vernedeng <verned...@apache.org> AuthorDate: Thu Nov 9 10:22:19 2023 +0800 [INLONG-9223][Sort] TubeMQ source support InlongAudit (#9236) * [INLONG-9223][Sort] TubeMQ source support InlongAudit --- .../protocol/node/extract/TubeMQExtractNode.java | 31 +++++++++++++- .../sort-connectors/tubemq/pom.xml | 11 +++++ .../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 31 +++++++------- .../table/DynamicTubeMQDeserializationSchema.java | 29 ++++++++++++- .../tubemq/table/TubeMQDynamicTableFactory.java | 43 +++++++++++++++---- .../sort/tubemq/table/TubeMQTableSource.java | 49 ++++++++++++++++------ .../sort-connectors/tubemq/pom.xml | 6 +++ .../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 1 + .../table/DynamicTubeMQDeserializationSchema.java | 29 ++++++++++++- .../tubemq/table/TubeMQDynamicTableFactory.java | 25 +++++++++-- .../sort/tubemq/table/TubeMQTableSource.java | 29 +++++++++++-- 11 files changed, 238 insertions(+), 46 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java index cc6ac2b5f1..d67ef15a09 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java @@ -17,8 +17,11 @@ package org.apache.inlong.sort.protocol.node.extract; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.sort.formats.util.StringUtils; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.constant.TubeMQConstant; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; @@ -35,8 +38,10 @@ import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.Serializable; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeSet; /** @@ -45,7 +50,7 @@ import java.util.TreeSet; @EqualsAndHashCode(callSuper = true) @JsonTypeName("tubeMQExtract") @Data -public class TubeMQExtractNode extends ExtractNode implements Serializable { +public class TubeMQExtractNode extends ExtractNode implements Serializable, InlongMetric, Metadata { private static final long serialVersionUID = -2544747886429528474L; @@ -119,4 +124,28 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable { return String.format("table_%s", super.getId()); } + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case AUDIT_DATA_TIME: + metadataKey = "value.data-time"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + + @Override + public boolean isVirtual(MetaField metaField) { + return true; + } + + @Override + public Set<MetaField> supportedMetaFields() { + return EnumSet.of(MetaField.AUDIT_DATA_TIME); + } + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml index d0cca1e07b..19dd75f435 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/pom.xml @@ -46,6 +46,17 @@ <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index 96c27d2b9b..abd69f8ecb 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -68,8 +68,6 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> private static final Logger LOG = LoggerFactory.getLogger(FlinkTubeMQConsumer.class); private static final String TUBE_OFFSET_STATE = "tube-offset-state"; - private static final String SPLIT_COMMA = ","; - private static final String SPLIT_COLON = ":"; /** * The address of TubeMQ master, format eg: 127.0.0.1:8715,127.0.0.2:8715. @@ -82,9 +80,9 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> private final String topic; /** - * The tubemq consumers use this tid set to filter records reading from server. + * The tubemq consumers use this streamId set to filter records reading from server. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The consumer group name. @@ -130,7 +128,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> /** * The current offsets of partitions which are stored in {@link #offsetsState} * once a checkpoint is triggered. - * + * <p> * NOTE: The offsets are populated in the main thread and saved in the * checkpoint thread. Its usage must be guarded by the checkpoint lock.</p> */ @@ -147,18 +145,18 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> /** * Build a TubeMQ source function * - * @param masterAddress the master address of TubeMQ - * @param topic the topic name - * @param tidSet the topic's filter condition items - * @param consumerGroup the consumer group name + * @param masterAddress the master address of TubeMQ + * @param topic the topic name + * @param streamIdSet the topic's filter condition items + * @param consumerGroup the consumer group name * @param deserializationSchema the deserialize schema - * @param configuration the configure - * @param sessionKey the tube session key + * @param configuration the configure + * @param sessionKey the tube session key */ public FlinkTubeMQConsumer( String masterAddress, String topic, - TreeSet<String> tidSet, + TreeSet<String> streamIdSet, String consumerGroup, DeserializationSchema<T> deserializationSchema, Configuration configuration, @@ -166,14 +164,14 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> Boolean innerFormat) { checkNotNull(masterAddress, "The master address must not be null."); checkNotNull(topic, "The topic must not be null."); - checkNotNull(tidSet, "The tid set must not be null."); + checkNotNull(streamIdSet, "The streamId set must not be null."); checkNotNull(consumerGroup, "The consumer group must not be null."); checkNotNull(deserializationSchema, "The deserialization schema must not be null."); checkNotNull(configuration, "The configuration must not be null."); this.masterAddress = masterAddress; this.topic = topic; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.consumerGroup = consumerGroup; this.deserializationSchema = deserializationSchema; this.sessionKey = sessionKey; @@ -210,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void open(Configuration parameters) throws Exception { + deserializationSchema.open(null); ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET @@ -220,7 +219,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> final int numTasks = getRuntimeContext().getNumberOfParallelSubtasks(); messageSessionFactory = new TubeSingleSessionFactory(consumerConfig); messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig); - messagePullConsumer.subscribe(topic, tidSet); + messagePullConsumer.subscribe(topic, streamIdSet); String jobId = getRuntimeContext().getJobId().toString(); messagePullConsumer.completeSubscribe(sessionKey.concat(jobId), numTasks, true, currentOffsets); @@ -305,7 +304,9 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> rowDataList.forEach(data -> records.add((T) data)); } } + return lastConsumeInstant; + } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index f5880f1a78..f68bc4cf5e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -17,22 +17,31 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.tubemq.corebase.Message; import com.google.common.base.Objects; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.stream.Collectors; public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> { + private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class); /** * data buffer message */ @@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema */ private final boolean ignoreErrors; + private SourceMetricData sourceMetricData; + + private MetricOption metricOption; + public DynamicTubeMQDeserializationSchema( DeserializationSchema<RowData> schema, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, - boolean ignoreErrors) { + boolean ignoreErrors, + MetricOption metricOption) { this.deserializationSchema = schema; this.metadataConverters = metadataConverters; this.producedTypeInfo = producedTypeInfo; this.ignoreErrors = ignoreErrors; + this.metricOption = metricOption; + } + + @Override + public void open(InitializationContext context) { + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } } @Override @@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema @Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException { - deserializationSchema.deserialize(message, out); + List<RowData> rows = new ArrayList<>(); + deserializationSchema.deserialize(message, + new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData)); + rows.forEach(out::collect); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 43fb3e198e..6af6f8f645 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -18,6 +18,7 @@ package org.apache.inlong.sort.tubemq.table; import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; @@ -25,6 +26,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; import org.apache.flink.table.connector.format.Format; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.data.RowData; @@ -32,6 +34,7 @@ import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; +import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; @@ -41,6 +44,9 @@ import java.util.Set; import java.util.TreeSet; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT; @@ -68,13 +74,18 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { .orElseGet(() -> helper.discoverDecodingFormat(DeserializationFormatFactory.class, FORMAT)); } + private static EncodingFormat<SerializationSchema<RowData>> getValueEncodingFormat( + TableFactoryHelper helper) { + return helper.discoverOptionalEncodingFormat(SerializationFormatFactory.class, FORMAT) + .orElseGet(() -> helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT)); + } + private static void validatePKConstraints( ObjectIdentifier tableName, CatalogTable catalogTable, Format format) { if (catalogTable.getSchema().getPrimaryKey().isPresent() && format.getChangelogMode().containsOnly(RowKind.INSERT)) { Configuration options = Configuration.fromMap(catalogTable.getOptions()); String formatName = options.getOptional(FORMAT).orElse(options.get(FORMAT)); - innerFormat = INNERFORMATTYPE.equals(formatName); throw new ValidationException(String.format( "The TubeMQ table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint" + " on the table, because it can't guarantee the semantic of primary key.", @@ -110,10 +121,15 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { helper.validateExcept(INNERFORMATTYPE); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); + innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT)); final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions()); - final DataType physicalDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType(); + final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); + String auditKeys = tableOptions.get(AUDIT_KEYS); return createTubeMQTableSource( physicalDataType, @@ -123,7 +139,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { TubeMQOptions.getTiSet(tableOptions), TubeMQOptions.getConsumerGroup(tableOptions), TubeMQOptions.getSessionKey(tableOptions), - properties); + properties, + inlongMetric, + auditHostAndPorts, + auditKeys); } protected TubeMQTableSource createTubeMQTableSource( @@ -131,23 +150,29 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, String topic, String url, - TreeSet<String> tid, + TreeSet<String> streamId, String consumerGroup, String sessionKey, - Configuration properties) { + Configuration properties, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { return new TubeMQTableSource( physicalDataType, valueDecodingFormat, url, topic, - tid, + streamId, consumerGroup, sessionKey, properties, null, null, false, - innerFormat); + innerFormat, + inlongMetric, + auditHostAndPorts, + auditKeys); } @Override @@ -172,6 +197,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory { options.add(SESSION_KEY); options.add(BOOTSTRAP_FROM_MAX); options.add(TOPIC_PATTERN); + options.add(AUDIT_KEYS); + options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); return options; } + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java index c2642fd351..4a1d332ca8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer; import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter; import org.apache.inlong.tubemq.corebase.Message; @@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada private static final String VALUE_METADATA_PREFIX = "value."; + private static final Logger LOG = LoggerFactory.getLogger(TubeMQTableSource.class); + // -------------------------------------------------------------------------------------------- // Mutable attributes // -------------------------------------------------------------------------------------------- @@ -84,9 +89,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada */ private final String topic; /** - * The TubeMQ tid filter collection. + * The TubeMQ streamId filter collection. */ - private final TreeSet<String> tidSet; + private final TreeSet<String> streamIdSet; /** * The TubeMQ consumer group name. */ @@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada * Metadata that is appended at the end of a physical source row. */ protected List<String> metadataKeys; + + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + /** * Watermark strategy that is used to generate per-partition watermark. */ @@ -129,15 +139,16 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada public TubeMQTableSource(DataType physicalDataType, DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat, String masterAddress, String topic, - TreeSet<String> tidSet, String consumerGroup, String sessionKey, + TreeSet<String> streamIdSet, String consumerGroup, String sessionKey, Configuration configuration, @Nullable WatermarkStrategy<RowData> watermarkStrategy, - Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat) { + Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat, + String inlongMetric, String auditHostAndPorts, String auditKeys) { Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null."); Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null."); Preconditions.checkNotNull(masterAddress, "The master address must not be null."); Preconditions.checkNotNull(topic, "The topic must not be null."); - Preconditions.checkNotNull(tidSet, "The tid set must not be null."); + Preconditions.checkNotNull(streamIdSet, "The streamId set must not be null."); Preconditions.checkNotNull(consumerGroup, "The consumer group must not be null."); Preconditions.checkNotNull(configuration, "The configuration must not be null."); @@ -147,7 +158,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada this.valueDecodingFormat = valueDecodingFormat; this.masterAddress = masterAddress; this.topic = topic; - this.tidSet = tidSet; + this.streamIdSet = streamIdSet; this.consumerGroup = consumerGroup; this.sessionKey = sessionKey; this.configuration = configuration; @@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada this.proctimeAttribute = proctimeAttribute; this.ignoreErrors = ignoreErrors; this.innerFormat = innerFormat; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; } @Override @@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada final LogicalType physicalType = physicalDataType.getLogicalType(); final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); final IntStream physicalFields = IntStream.range(0, physicalFieldCount); + final DeserializationSchema<RowData> deserialization = createDeserialization(context, valueDecodingFormat, physicalFields.toArray(), null); @@ -182,8 +197,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada public DynamicTableSource copy() { return new TubeMQTableSource( physicalDataType, valueDecodingFormat, masterAddress, - topic, tidSet, consumerGroup, sessionKey, configuration, - watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat); + topic, streamIdSet, consumerGroup, sessionKey, configuration, + watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat, + inlongMetric, auditHostAndPorts, auditKeys); } @Override @@ -247,7 +263,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada && Objects.equals(valueDecodingFormat, that.valueDecodingFormat) && Objects.equals(masterAddress, that.masterAddress) && Objects.equals(topic, that.topic) - && Objects.equals(String.valueOf(tidSet), String.valueOf(that.tidSet)) + && Objects.equals(String.valueOf(streamIdSet), String.valueOf(that.streamIdSet)) && Objects.equals(consumerGroup, that.consumerGroup) && Objects.equals(proctimeAttribute, that.proctimeAttribute) && Objects.equals(watermarkStrategy, that.watermarkStrategy); @@ -260,7 +276,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada valueDecodingFormat, masterAddress, topic, - tidSet, + streamIdSet, consumerGroup, configuration, watermarkStrategy, @@ -273,7 +289,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada @Nullable private DeserializationSchema<RowData> createDeserialization( - DynamicTableSource.Context context, + Context context, @Nullable DecodingFormat<DeserializationSchema<RowData>> format, int[] projection, @Nullable String prefix) { @@ -299,10 +315,17 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada .orElseThrow(IllegalStateException::new)) .map(m -> m.converter) .toArray(MetadataConverter[]::new); + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + final DeserializationSchema<RowData> tubeMQDeserializer = new DynamicTubeMQDeserializationSchema( - deserialization, metadataConverters, producedTypeInfo, ignoreErrors); + deserialization, metadataConverters, producedTypeInfo, ignoreErrors, metricOption); - final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, tidSet, + final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet, consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat); return tubeMQConsumer; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index aec6e19919..5cc11b3783 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -51,6 +51,12 @@ <groupId>org.apache.inlong</groupId> <artifactId>sort-connector-base</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + </exclusion> + </exclusions> </dependency> </dependencies> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index cafa653f65..abd69f8ecb 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -208,6 +208,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void open(Configuration parameters) throws Exception { + deserializationSchema.open(null); ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index f5880f1a78..f68bc4cf5e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -17,22 +17,31 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricsCollector; +import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.tubemq.corebase.Message; import com.google.common.base.Objects; +import org.apache.flink.api.common.functions.util.ListCollector; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.stream.Collectors; public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> { + private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class); /** * data buffer message */ @@ -53,15 +62,28 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema */ private final boolean ignoreErrors; + private SourceMetricData sourceMetricData; + + private MetricOption metricOption; + public DynamicTubeMQDeserializationSchema( DeserializationSchema<RowData> schema, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, - boolean ignoreErrors) { + boolean ignoreErrors, + MetricOption metricOption) { this.deserializationSchema = schema; this.metadataConverters = metadataConverters; this.producedTypeInfo = producedTypeInfo; this.ignoreErrors = ignoreErrors; + this.metricOption = metricOption; + } + + @Override + public void open(InitializationContext context) { + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } } @Override @@ -71,7 +93,10 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema @Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException { - deserializationSchema.deserialize(message, out); + List<RowData> rows = new ArrayList<>(); + deserializationSchema.deserialize(message, + new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData)); + rows.forEach(out::collect); } @Override diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index 0472353037..e3a0a0c6d7 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -46,6 +46,9 @@ import java.util.Set; import java.util.TreeSet; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.BOOTSTRAP_FROM_MAX; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.CONSUME_GROUP; import static org.apache.inlong.sort.tubemq.table.TubeMQOptions.KEY_FORMAT; @@ -126,6 +129,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final DataType physicalDataType = context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(); + String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = tableOptions.get(INLONG_AUDIT); + String auditKeys = tableOptions.get(AUDIT_KEYS); + return createTubeMQTableSource( physicalDataType, valueDecodingFormat, @@ -134,7 +141,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn TubeMQOptions.getTiSet(tableOptions), TubeMQOptions.getConsumerGroup(tableOptions), TubeMQOptions.getSessionKey(tableOptions), - properties); + properties, + inlongMetric, + auditHostAndPorts, + auditKeys); } @Override @@ -171,7 +181,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn TreeSet<String> streamId, String consumerGroup, String sessionKey, - Configuration properties) { + Configuration properties, + String inlongMetric, + String auditHostAndPorts, + String auditKeys) { return new TubeMQTableSource( physicalDataType, valueDecodingFormat, @@ -184,7 +197,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn null, null, false, - innerFormat); + innerFormat, + inlongMetric, + auditHostAndPorts, + auditKeys); } protected TubeMQTableSink createTubeMQTableSink( @@ -225,6 +241,9 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn options.add(SESSION_KEY); options.add(BOOTSTRAP_FROM_MAX); options.add(TOPIC_PATTERN); + options.add(AUDIT_KEYS); + options.add(INLONG_METRIC); + options.add(INLONG_AUDIT); return options; } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java index e79685ff70..4a1d332ca8 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer; import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter; import org.apache.inlong.tubemq.corebase.Message; @@ -40,6 +41,8 @@ import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; @@ -62,6 +65,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada private static final String VALUE_METADATA_PREFIX = "value."; + private static final Logger LOG = LoggerFactory.getLogger(TubeMQTableSource.class); + // -------------------------------------------------------------------------------------------- // Mutable attributes // -------------------------------------------------------------------------------------------- @@ -120,6 +125,11 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada * Metadata that is appended at the end of a physical source row. */ protected List<String> metadataKeys; + + private String inlongMetric; + private String auditHostAndPorts; + private String auditKeys; + /** * Watermark strategy that is used to generate per-partition watermark. */ @@ -131,7 +141,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada String masterAddress, String topic, TreeSet<String> streamIdSet, String consumerGroup, String sessionKey, Configuration configuration, @Nullable WatermarkStrategy<RowData> watermarkStrategy, - Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat) { + Optional<String> proctimeAttribute, Boolean ignoreErrors, Boolean innerFormat, + String inlongMetric, String auditHostAndPorts, String auditKeys) { Preconditions.checkNotNull(physicalDataType, "Physical data type must not be null."); Preconditions.checkNotNull(valueDecodingFormat, "The deserialization schema must not be null."); @@ -155,6 +166,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada this.proctimeAttribute = proctimeAttribute; this.ignoreErrors = ignoreErrors; this.innerFormat = innerFormat; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; + this.auditKeys = auditKeys; } @Override @@ -167,6 +181,7 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada final LogicalType physicalType = physicalDataType.getLogicalType(); final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); final IntStream physicalFields = IntStream.range(0, physicalFieldCount); + final DeserializationSchema<RowData> deserialization = createDeserialization(context, valueDecodingFormat, physicalFields.toArray(), null); @@ -183,7 +198,8 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada return new TubeMQTableSource( physicalDataType, valueDecodingFormat, masterAddress, topic, streamIdSet, consumerGroup, sessionKey, configuration, - watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat); + watermarkStrategy, proctimeAttribute, ignoreErrors, innerFormat, + inlongMetric, auditHostAndPorts, auditKeys); } @Override @@ -299,8 +315,15 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada .orElseThrow(IllegalStateException::new)) .map(m -> m.converter) .toArray(MetadataConverter[]::new); + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + final DeserializationSchema<RowData> tubeMQDeserializer = new DynamicTubeMQDeserializationSchema( - deserialization, metadataConverters, producedTypeInfo, ignoreErrors); + deserialization, metadataConverters, producedTypeInfo, ignoreErrors, metricOption); final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet, consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat);