This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ca0abf2438 [INLONG-9247][Sort] TubeMQ source support audit when the deserialized type is not InlongMsg (#9258) ca0abf2438 is described below commit ca0abf243863cac31fe9f89da2c01a812e347b65 Author: vernedeng <verned...@apache.org> AuthorDate: Mon Nov 13 10:51:32 2023 +0800 [INLONG-9247][Sort] TubeMQ source support audit when the deserialized type is not InlongMsg (#9258) --- .../inlong/sort/protocol/node/ExtractNode.java | 4 + .../protocol/node/extract/TubeMQExtractNode.java | 9 +- .../sort-connectors/tubemq/pom.xml | 5 + .../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 12 +- .../table/DynamicTubeMQDeserializationSchema.java | 148 ++++----------------- ...> DynamicTubeMQTableDeserializationSchema.java} | 41 +++--- .../tubemq/table/TubeMQDynamicTableFactory.java | 10 +- .../sort/tubemq/table/TubeMQTableSource.java | 13 +- 8 files changed, 85 insertions(+), 157 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java index b9d649d621..fc68f0f356 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java @@ -73,6 +73,10 @@ public abstract class ExtractNode implements Node { public static final String INLONG_MSG = "inlong-msg"; + public static final String INLONG_MSG_AUDIT_TIME = "value.data-time"; + + public static final String CONSUME_AUDIT_TIME = "consume_time"; + @JsonProperty("id") private String id; @JsonInclude(Include.NON_NULL) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java index d67ef15a09..327cb522a2 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/TubeMQExtractNode.java @@ -25,6 +25,7 @@ import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.constant.TubeMQConstant; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.transformation.WatermarkField; import com.google.common.base.Preconditions; @@ -129,10 +130,14 @@ public class TubeMQExtractNode extends ExtractNode implements Serializable, Inlo String metadataKey; switch (metaField) { case AUDIT_DATA_TIME: - metadataKey = "value.data-time"; + if (format instanceof InLongMsgFormat) { + metadataKey = INLONG_MSG_AUDIT_TIME; + } else { + metadataKey = CONSUME_AUDIT_TIME; + } break; default: - throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + throw new UnsupportedOperationException(String.format("Unsupported meta field for %s: %s", this.getClass().getSimpleName(), metaField)); } return metadataKey; diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml index 5cc11b3783..898c56643c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/pom.xml @@ -58,6 +58,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-common</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java index abd69f8ecb..b9fb6d1b0d 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.tubemq; +import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema; import org.apache.inlong.sort.tubemq.table.TubeMQOptions; import org.apache.inlong.tubemq.client.config.ConsumerConfig; import org.apache.inlong.tubemq.client.consumer.ConsumePosition; @@ -27,7 +28,6 @@ import org.apache.inlong.tubemq.corebase.Message; import org.apache.inlong.tubemq.corebase.TErrCodeConstants; import org.apache.flink.api.common.functions.util.ListCollector; -import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; @@ -92,7 +92,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> /** * The deserializer for records. */ - private final DeserializationSchema<T> deserializationSchema; + private final DynamicTubeMQDeserializationSchema<T> deserializationSchema; /** * The random key for TubeMQ consumer group when startup. @@ -158,7 +158,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> String topic, TreeSet<String> streamIdSet, String consumerGroup, - DeserializationSchema<T> deserializationSchema, + DynamicTubeMQDeserializationSchema<T> deserializationSchema, Configuration configuration, String sessionKey, Boolean innerFormat) { @@ -208,7 +208,7 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> @Override public void open(Configuration parameters) throws Exception { - deserializationSchema.open(null); + deserializationSchema.open(); ConsumerConfig consumerConfig = new ConsumerConfig(masterAddress, consumerGroup); consumerConfig.setConsumePosition(consumeFromMax ? ConsumePosition.CONSUMER_FROM_LATEST_OFFSET @@ -292,14 +292,14 @@ public class FlinkTubeMQConsumer<T> extends RichParallelSourceFunction<T> lastConsumeInstant = Instant.now(); if (!innerFormat) { for (Message message : messageList) { - T record = deserializationSchema.deserialize(message.getData()); + T record = deserializationSchema.deserialize(message); records.add(record); } } else { List<RowData> rowDataList = new ArrayList<>(); ListCollector<RowData> out = new ListCollector<>(rowDataList); for (Message message : messageList) { - deserializationSchema.deserialize(message.getData(), (Collector<T>) out); + deserializationSchema.deserialize(message, (Collector<T>) out); } rowDataList.forEach(data -> records.add((T) data)); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java index f68bc4cf5e..4c4eaac841 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java @@ -17,143 +17,45 @@ package org.apache.inlong.sort.tubemq.table; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricsCollector; -import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.tubemq.corebase.Message; -import com.google.common.base.Objects; -import org.apache.flink.api.common.functions.util.ListCollector; -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.table.data.GenericRowData; -import org.apache.flink.table.data.RowData; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.stream.Collectors; -public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> { +public interface DynamicTubeMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { - private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class); - /** - * data buffer message - */ - private final DeserializationSchema<RowData> deserializationSchema; - - /** - * {@link MetadataConverter} of how to produce metadata from message. - */ - private final MetadataConverter[] metadataConverters; + @PublicEvolving + default void open() throws Exception { + } /** - * {@link TypeInformation} of the produced {@link RowData} (physical + meta data). + * Deserializes the byte message. + * + * @param message The message, as a byte array. + * @return The deserialized message as an object (null if the message cannot be deserialized). */ - private final TypeInformation<RowData> producedTypeInfo; + T deserialize(Message message) throws IOException; /** - * status of error + * Deserializes the byte message. + * + * <p>Can output multiple records through the {@link Collector}. Note that number and size of + * the produced records should be relatively small. Depending on the source implementation + * records can be buffered in memory or collecting records might delay emitting checkpoint + * barrier. + * + * @param message The message, as a byte array. + * @param out The collector to put the resulting messages. */ - private final boolean ignoreErrors; - - private SourceMetricData sourceMetricData; - - private MetricOption metricOption; - - public DynamicTubeMQDeserializationSchema( - DeserializationSchema<RowData> schema, - MetadataConverter[] metadataConverters, - TypeInformation<RowData> producedTypeInfo, - boolean ignoreErrors, - MetricOption metricOption) { - this.deserializationSchema = schema; - this.metadataConverters = metadataConverters; - this.producedTypeInfo = producedTypeInfo; - this.ignoreErrors = ignoreErrors; - this.metricOption = metricOption; - } - - @Override - public void open(InitializationContext context) { - if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption); + @PublicEvolving + default void deserialize(Message message, Collector<T> out) throws IOException { + T deserialize = deserialize(message); + if (deserialize != null) { + out.collect(deserialize); } } - - @Override - public RowData deserialize(byte[] bytes) throws IOException { - return deserializationSchema.deserialize(bytes); - } - - @Override - public void deserialize(byte[] message, Collector<RowData> out) throws IOException { - List<RowData> rows = new ArrayList<>(); - deserializationSchema.deserialize(message, - new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData)); - rows.forEach(out::collect); - } - - @Override - public boolean isEndOfStream(RowData rowData) { - return false; - } - - @Override - public TypeInformation<RowData> getProducedType() { - return producedTypeInfo; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof DynamicTubeMQDeserializationSchema)) { - return false; - } - DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o; - return ignoreErrors == that.ignoreErrors - && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()), - Arrays.stream(that.metadataConverters).collect(Collectors.toList())) - && Objects.equal(deserializationSchema, that.deserializationSchema) - && Objects.equal(producedTypeInfo, that.producedTypeInfo); - } - - @Override - public int hashCode() { - return Objects.hashCode(deserializationSchema, metadataConverters, producedTypeInfo, ignoreErrors); - } - - /** - * add metadata column - */ - private void emitRow(Message head, GenericRowData physicalRow, Collector<RowData> out) { - if (metadataConverters.length == 0) { - out.collect(physicalRow); - return; - } - final int physicalArity = physicalRow.getArity(); - final int metadataArity = metadataConverters.length; - final GenericRowData producedRow = - new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); - for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) { - producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); - } - for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { - producedRow.setField( - physicalArity + metadataPos, metadataConverters[metadataPos].read(head)); - } - out.collect(producedRow); - } - - interface MetadataConverter extends Serializable { - - Object read(Message head); - } } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java similarity index 81% copy from inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java copy to inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java index f68bc4cf5e..8ee154c535 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java @@ -29,8 +29,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Collector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -39,9 +37,8 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; -public class DynamicTubeMQDeserializationSchema implements DeserializationSchema<RowData> { +public class DynamicTubeMQTableDeserializationSchema implements DynamicTubeMQDeserializationSchema<RowData> { - private static final Logger LOG = LoggerFactory.getLogger(DynamicTubeMQDeserializationSchema.class); /** * data buffer message */ @@ -62,46 +59,54 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema */ private final boolean ignoreErrors; + private final boolean innerFormat; + private SourceMetricData sourceMetricData; private MetricOption metricOption; - public DynamicTubeMQDeserializationSchema( + public DynamicTubeMQTableDeserializationSchema( DeserializationSchema<RowData> schema, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, boolean ignoreErrors, + boolean innerFormat, MetricOption metricOption) { this.deserializationSchema = schema; this.metadataConverters = metadataConverters; this.producedTypeInfo = producedTypeInfo; this.ignoreErrors = ignoreErrors; + this.innerFormat = innerFormat; this.metricOption = metricOption; } @Override - public void open(InitializationContext context) { + public void open() { if (metricOption != null) { sourceMetricData = new SourceMetricData(metricOption); } } @Override - public RowData deserialize(byte[] bytes) throws IOException { - return deserializationSchema.deserialize(bytes); + public RowData deserialize(Message message) throws IOException { + return deserializationSchema.deserialize(message.getData()); } @Override - public void deserialize(byte[] message, Collector<RowData> out) throws IOException { + public void deserialize(Message message, Collector<RowData> out) throws IOException { List<RowData> rows = new ArrayList<>(); - deserializationSchema.deserialize(message, - new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData)); - rows.forEach(out::collect); - } - @Override - public boolean isEndOfStream(RowData rowData) { - return false; + MetricsCollector<RowData> metricsCollector = + new MetricsCollector<>(new ListCollector<>(rows), sourceMetricData); + + // reset time stamp if the deserialize schema has not inner format + if (!innerFormat) { + metricsCollector.resetTimestamp(System.currentTimeMillis()); + } + deserializationSchema.deserialize(message.getData(), metricsCollector); + + rows.forEach(row -> emitRow(message, (GenericRowData) row, out)); + } @Override @@ -114,10 +119,10 @@ public class DynamicTubeMQDeserializationSchema implements DeserializationSchema if (this == o) { return true; } - if (!(o instanceof DynamicTubeMQDeserializationSchema)) { + if (!(o instanceof DynamicTubeMQTableDeserializationSchema)) { return false; } - DynamicTubeMQDeserializationSchema that = (DynamicTubeMQDeserializationSchema) o; + DynamicTubeMQTableDeserializationSchema that = (DynamicTubeMQTableDeserializationSchema) o; return ignoreErrors == that.ignoreErrors && Objects.equal(Arrays.stream(metadataConverters).collect(Collectors.toList()), Arrays.stream(that.metadataConverters).collect(Collectors.toList())) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java index e3a0a0c6d7..4962364a9c 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQDynamicTableFactory.java @@ -17,6 +17,8 @@ package org.apache.inlong.sort.tubemq.table; +import org.apache.inlong.sort.protocol.node.ExtractNode; + import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigOption; @@ -66,8 +68,6 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn public static final String IDENTIFIER = "tubemq-inlong"; - public static final String INNERFORMATTYPE = "inlong-msg"; - public static boolean innerFormat = false; private static DecodingFormat<DeserializationSchema<RowData>> getValueDecodingFormat( @@ -120,10 +120,10 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); // validate all options - helper.validateExcept(INNERFORMATTYPE); + helper.validateExcept(ExtractNode.INLONG_MSG); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueDecodingFormat); - innerFormat = INNERFORMATTYPE.equals(tableOptions.get(FORMAT)); + innerFormat = ExtractNode.INLONG_MSG.equals(tableOptions.get(FORMAT)); final Configuration properties = getTubeMQProperties(context.getCatalogTable().getOptions()); @@ -156,7 +156,7 @@ public class TubeMQDynamicTableFactory implements DynamicTableSourceFactory, Dyn final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = getValueEncodingFormat(helper); // validate all options - helper.validateExcept(INNERFORMATTYPE); + helper.validateExcept(ExtractNode.INLONG_MSG); validatePKConstraints(context.getObjectIdentifier(), context.getCatalogTable(), valueEncodingFormat); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java index 4a1d332ca8..2d5ddbb3d5 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQTableSource.java @@ -18,8 +18,9 @@ package org.apache.inlong.sort.tubemq.table; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.tubemq.FlinkTubeMQConsumer; -import org.apache.inlong.sort.tubemq.table.DynamicTubeMQDeserializationSchema.MetadataConverter; +import org.apache.inlong.sort.tubemq.table.DynamicTubeMQTableDeserializationSchema.MetadataConverter; import org.apache.inlong.tubemq.corebase.Message; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -322,8 +323,9 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada .withAuditKeys(auditKeys) .build(); - final DeserializationSchema<RowData> tubeMQDeserializer = new DynamicTubeMQDeserializationSchema( - deserialization, metadataConverters, producedTypeInfo, ignoreErrors, metricOption); + final DynamicTubeMQDeserializationSchema<RowData> tubeMQDeserializer = + new DynamicTubeMQTableDeserializationSchema( + deserialization, metadataConverters, producedTypeInfo, ignoreErrors, innerFormat, metricOption); final FlinkTubeMQConsumer<RowData> tubeMQConsumer = new FlinkTubeMQConsumer(masterAddress, topic, streamIdSet, consumerGroup, tubeMQDeserializer, configuration, sessionKey, innerFormat); @@ -336,6 +338,11 @@ public class TubeMQTableSource implements ScanTableSource, SupportsReadingMetada enum ReadableMetadata { + CONSUME_TIME( + ExtractNode.CONSUME_AUDIT_TIME, + DataTypes.BIGINT().notNull(), + m -> System.currentTimeMillis()), + TOPIC( "topic", DataTypes.STRING().notNull(),