This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c4553ddc46 [INLONG-9202][Sort] Fix audit report error when running pulsar -> iceberg in flink1.15 (#9206) c4553ddc46 is described below commit c4553ddc46f455e8efa8a3b286cf2783dfcc58bd Author: Sting <zpen...@connect.ust.hk> AuthorDate: Fri Nov 3 15:21:01 2023 +0800 [INLONG-9202][Sort] Fix audit report error when running pulsar -> iceberg in flink1.15 (#9206) --- .../org/apache/inlong/audit/AuditOperator.java | 4 ++- .../protocol/node/extract/PulsarExtractNode.java | 31 +++++++++++++++++++++- .../sort/protocol/node/load/IcebergLoadNode.java | 16 ++++++++++- inlong-sort/sort-core/pom.xml | 6 +++++ .../org/apache/inlong/sort/base/Constants.java | 8 ++++++ .../sort-connectors/pulsar/pom.xml | 2 -- .../table/PulsarTableDeserializationSchema.java | 10 +++++-- .../PulsarTableDeserializationSchemaFactory.java | 6 +---- .../formats/inlongmsg/InLongMsgDecodingFormat.java | 13 +++++++++ .../sort/formats/inlongmsg/InLongMsgUtils.java | 3 +++ 10 files changed, 87 insertions(+), 12 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java index a89c30f6c0..e720a75cb4 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditOperator.java @@ -26,6 +26,7 @@ import org.apache.inlong.audit.util.StatInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; @@ -42,8 +43,9 @@ import static org.apache.inlong.audit.protocol.AuditApi.BaseCommand.Type.AUDIT_R /** * Audit operator, which is singleton. */ -public class AuditOperator { +public class AuditOperator implements Serializable { + private static final long serialVersionUID = 1L; private static final Logger LOGGER = LoggerFactory.getLogger(AuditOperator.class); private static final String FIELD_SEPARATORS = ":"; private static final String DEFAULT_AUDIT_TAG = "-1"; diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java index cba35f8683..d6634013da 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/PulsarExtractNode.java @@ -17,8 +17,10 @@ package org.apache.inlong.sort.protocol.node.extract; +import org.apache.inlong.common.enums.MetaField; import org.apache.inlong.sort.protocol.FieldInfo; import org.apache.inlong.sort.protocol.InlongMetric; +import org.apache.inlong.sort.protocol.Metadata; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.WatermarkField; @@ -34,13 +36,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.Set; @EqualsAndHashCode(callSuper = true) @JsonTypeName("pulsarExtract") @Data -public class PulsarExtractNode extends ExtractNode implements InlongMetric { +public class PulsarExtractNode extends ExtractNode implements InlongMetric, Metadata { private static final long serialVersionUID = 1L; @@ -136,4 +140,29 @@ public class PulsarExtractNode extends ExtractNode implements InlongMetric { public List<FieldInfo> getPartitionFields() { return super.getPartitionFields(); } + + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case AUDIT_DATA_TIME: + metadataKey = "value.data-time"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + + @Override + public boolean isVirtual(MetaField metaField) { + return true; + } + + @Override + public Set<MetaField> supportedMetaFields() { + return EnumSet.of(MetaField.AUDIT_DATA_TIME); + } + } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index d39114c3b7..6ffaf5f2da 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -132,9 +132,23 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, return super.getPartitionFields(); } + @Override + public String getMetadataKey(MetaField metaField) { + String metadataKey; + switch (metaField) { + case AUDIT_DATA_TIME: + metadataKey = "audit_data_time"; + break; + default: + throw new UnsupportedOperationException(String.format("Unsupport meta field for %s: %s", + this.getClass().getSimpleName(), metaField)); + } + return metadataKey; + } + @Override public boolean isVirtual(MetaField metaField) { - return true; + return false; } @Override diff --git a/inlong-sort/sort-core/pom.xml b/inlong-sort/sort-core/pom.xml index 7228bfb59c..998905c038 100644 --- a/inlong-sort/sort-core/pom.xml +++ b/inlong-sort/sort-core/pom.xml @@ -293,6 +293,12 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-iceberg-v1.15</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 5065c78c1f..ac7baac4dd 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -421,4 +421,12 @@ public final class Constants { .booleanType() .defaultValue(false) .withDescription("Whether supporting auto create table when snapshot, default value is 'false'"); + + public static final ConfigOption<String> INNER_FORMAT = + ConfigOptions.key("inlong-msg.inner.format") + .stringType() + .noDefaultValue() + .withDescription( + "Inner format"); + } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml index d2b24382a6..7f4afdd848 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/pom.xml @@ -89,8 +89,6 @@ <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> - <scope>provided</scope> - <optional>true</optional> </dependency> <!-- Pulsar Client --> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java index 237de45b4d..4792d7b287 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchema.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.pulsar.table; +import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricsCollector; import org.apache.inlong.sort.base.metric.SourceMetricData; @@ -58,13 +59,15 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc private SourceMetricData sourceMetricData; + private MetricOption metricOption; + public PulsarTableDeserializationSchema( @Nullable DeserializationSchema<RowData> keyDeserialization, DeserializationSchema<RowData> valueDeserialization, TypeInformation<RowData> producedTypeInfo, PulsarRowDataConverter rowDataConverter, boolean upsertMode, - SourceMetricData sourceMetricData) { + MetricOption metricOption) { if (upsertMode) { checkNotNull(keyDeserialization, "upsert mode must specify a key format"); } @@ -73,7 +76,7 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc this.rowDataConverter = checkNotNull(rowDataConverter); this.producedTypeInfo = checkNotNull(producedTypeInfo); this.upsertMode = upsertMode; - this.sourceMetricData = sourceMetricData; + this.metricOption = metricOption; } @Override @@ -82,6 +85,9 @@ public class PulsarTableDeserializationSchema implements PulsarDeserializationSc if (keyDeserialization != null) { keyDeserialization.open(context); } + if (metricOption != null) { + sourceMetricData = new SourceMetricData(metricOption); + } valueDeserialization.open(context); } diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java index 360b07aa69..c063e26539 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/PulsarTableDeserializationSchemaFactory.java @@ -172,17 +172,13 @@ public class PulsarTableDeserializationSchemaFactory implements Serializable { .withAuditKeys(auditKeys) .build(); - if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption); - } - return new PulsarTableDeserializationSchema( keyDeserialization, valueDeserialization, producedTypeInfo, rowDataConverter, upsertMode, - sourceMetricData); + metricOption); } public void setProducedDataType(DataType producedDataType) { diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java index 4645290c1b..0f67bbc072 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java @@ -139,6 +139,19 @@ public class InLongMsgDecodingFormat implements DecodingFormat<DeserializationSc enum ReadableMetadata { + DATA_TIME( + "data-time", + DataTypes.BIGINT().notNull(), + new MetadataConverter() { + + private static final long serialVersionUID = 1L; + + @Override + public Object read(InLongMsgHead head) { + return head.getTime().getTime(); + } + }), + CREATE_TIME( "create-time", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().notNull(), diff --git a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java index 16568d00c3..371ce84808 100644 --- a/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java +++ b/inlong-sort/sort-formats/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java @@ -54,6 +54,7 @@ public class InLongMsgUtils { // keys in attributes public static final String INLONGMSG_ATTR_STREAM_ID = "streamId"; + public static final String INLONGMSG_ATTR_TID = "tid"; public static final String INLONGMSG_ATTR_TIME_T = "t"; public static final String INLONGMSG_ATTR_TIME_DT = "dt"; public static final String INLONGMSG_ATTR_ADD_COLUMN_PREFIX = "__addcol"; @@ -118,6 +119,8 @@ public class InLongMsgUtils { if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) { streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID); + } else if (attributes.containsKey(INLONGMSG_ATTR_TID)) { + streamId = attributes.get(INLONGMSG_ATTR_TID); } else { throw new IllegalArgumentException("Could not find " + INLONGMSG_ATTR_STREAM_ID + " in attributes!"); }