This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 09bae48fe [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688) 09bae48fe is described below commit 09bae48fe06662bc8cd795a9272573a7ccd9135a Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Thu Dec 1 14:47:54 2022 +0800 [INLONG-6671][Sort] Supports dirty data side-output for kafka connector (#6688) --- .../base/dirty/sink/log/LogDirtySinkFactory.java | 9 +- .../base/dirty/sink/s3/S3DirtySinkFactory.java | 8 +- .../base/dirty/utils/DirtySinkFactoryUtils.java | 48 ++++++ .../kafka/DynamicKafkaSerializationSchema.java | 161 ++++++++++++++++----- .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 18 ++- .../table/DynamicKafkaDeserializationSchema.java | 59 +++++++- .../sort/kafka/table/KafkaDynamicSource.java | 22 ++- .../sort/kafka/table/KafkaDynamicTableFactory.java | 49 +++++-- .../table/UpsertKafkaDynamicTableFactory.java | 25 +++- .../inlong/sort/parser/KafkaLoadSqlParseTest.java | 150 +++++++++++++++++++ 10 files changed, 481 insertions(+), 68 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java index c3720a93d..b07b581f1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java @@ -18,6 +18,8 @@ package org.apache.inlong.sort.base.dirty.sink.log; import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.factories.DynamicTableFactory.Context; import org.apache.flink.table.factories.FactoryUtil; import org.apache.inlong.sort.base.dirty.sink.DirtySink; @@ -38,9 +40,10 @@ public class LogDirtySinkFactory implements DirtySinkFactory { @Override public <T> DirtySink<T> createDirtySink(Context context) { - FactoryUtil.validateFactoryOptions(this, context.getConfiguration()); - String format = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT); - String fieldDelimiter = context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER); + ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); + FactoryUtil.validateFactoryOptions(this, config); + String format = config.get(DIRTY_SIDE_OUTPUT_FORMAT); + String fieldDelimiter = config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER); return new LogDirtySink<>(format, fieldDelimiter, context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java index d9ec26434..16310926c 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.dirty.sink.s3; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.factories.DynamicTableFactory.Context; @@ -79,9 +80,10 @@ public class S3DirtySinkFactory implements DirtySinkFactory { @Override public <T> DirtySink<T> createDirtySink(Context context) { - FactoryUtil.validateFactoryOptions(this, context.getConfiguration()); - validate(context.getConfiguration()); - return new S3DirtySink<>(getS3Options(context.getConfiguration()), + ReadableConfig config = Configuration.fromMap(context.getCatalogTable().getOptions()); + FactoryUtil.validateFactoryOptions(this, config); + validate(config); + return new S3DirtySink<>(getS3Options(config), context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType()); } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java new file mode 100644 index 000000000..8f5fc9ff4 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty.utils; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.factories.DynamicTableFactory.Context; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory; + +/** + * Dirty sink facotry utils, it helps to create dirty sink + */ +public final class DirtySinkFactoryUtils { + + private DirtySinkFactoryUtils() { + } + + public static <T> DirtySink<T> createDirtySink(Context context, DirtyOptions dirtyOptions) { + if (dirtyOptions == null) { + dirtyOptions = DirtyOptions.fromConfig(Configuration.fromMap(context.getCatalogTable().getOptions())); + } + dirtyOptions.validate(); + DirtySink<T> dirtySink = null; + if (dirtyOptions.ignoreDirty() && dirtyOptions.enableDirtySideOutput()) { + DirtySinkFactory dirtySinkFactory = FactoryUtil.discoverFactory(context.getClassLoader(), + DirtySinkFactory.class, dirtyOptions.getDirtyConnector()); + dirtySink = dirtySinkFactory.createDirtySink(context); + } + return dirtySink; + } +} diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java index be9c48480..6b0b733a9 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.configuration.Configuration; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.streaming.connectors.kafka.KafkaContextAware; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; @@ -28,6 +29,10 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.formats.raw.RawFormatSerializationSchema; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; import org.apache.inlong.sort.kafka.KafkaDynamicSink.WritableMetadata; @@ -36,7 +41,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; @@ -80,6 +84,8 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat private final String sinkMultipleFormat; private boolean multipleSink; private JsonDynamicSchemaFormat jsonDynamicSchemaFormat; + private final DirtyOptions dirtyOptions; + private final @Nullable DirtySink<Object> dirtySink; private int[] partitions; private int parallelInstanceId; @@ -97,7 +103,9 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat int[] metadataPositions, boolean upsertMode, @Nullable String sinkMultipleFormat, - @Nullable String topicPattern) { + @Nullable String topicPattern, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { if (upsertMode) { Preconditions.checkArgument( keySerialization != null && keyFieldGetters.length > 0, @@ -114,6 +122,8 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat this.upsertMode = upsertMode; this.sinkMultipleFormat = sinkMultipleFormat; this.topicPattern = topicPattern; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } static RowData createProjectedRow( @@ -135,9 +145,12 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat if (partitioner != null) { partitioner.open(parallelInstanceId, numParallelInstances); } + if (dirtySink != null) { + dirtySink.open(new Configuration()); + } // Only support dynamic topic when the topicPattern is specified // and the valueSerialization is RawFormatSerializationSchema - if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) { + if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(sinkMultipleFormat)) { multipleSink = true; jsonDynamicSchemaFormat = (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); @@ -148,19 +161,24 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, @Nullable Long timestamp) { // shortcut in case no input projection is required if (keySerialization == null && !hasMetadata) { - final byte[] valueSerialized = valueSerialization.serialize(consumedRow); - return new ProducerRecord<>( - getTargetTopic(consumedRow), - extractPartition(consumedRow, null, valueSerialized), - null, - valueSerialized); + final byte[] valueSerialized = serializeWithDirtyHandle(consumedRow, + DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization); + if (valueSerialized != null) { + return new ProducerRecord<>( + getTargetTopic(consumedRow), + extractPartition(consumedRow, null, valueSerialized), + null, + valueSerialized); + } } final byte[] keySerialized; + boolean mayDirtyData = false; if (keySerialization == null) { keySerialized = null; } else { final RowData keyRow = createProjectedRow(consumedRow, RowKind.INSERT, keyFieldGetters); - keySerialized = keySerialization.serialize(keyRow); + keySerialized = serializeWithDirtyHandle(keyRow, DirtyType.KEY_SERIALIZE_ERROR, keySerialization); + mayDirtyData = keySerialized == null; } final byte[] valueSerialized; @@ -173,10 +191,16 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat } else { // make the message to be INSERT to be compliant with the INSERT-ONLY format valueRow.setRowKind(RowKind.INSERT); - valueSerialized = valueSerialization.serialize(valueRow); + valueSerialized = serializeWithDirtyHandle(valueRow, + DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization); + mayDirtyData = mayDirtyData || valueSerialized == null; } } else { - valueSerialized = valueSerialization.serialize(valueRow); + valueSerialized = serializeWithDirtyHandle(valueRow, DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization); + mayDirtyData = mayDirtyData || valueSerialized == null; + } + if (mayDirtyData) { + return null; } return new ProducerRecord<>( getTargetTopic(consumedRow), @@ -187,6 +211,67 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat readMetadata(consumedRow, KafkaDynamicSink.WritableMetadata.HEADERS)); } + private byte[] serializeWithDirtyHandle(RowData consumedRow, DirtyType dirtyType, + SerializationSchema<RowData> serialization) { + if (!dirtyOptions.ignoreDirty()) { + return serialization.serialize(consumedRow); + } + byte[] value = null; + try { + value = serialization.serialize(consumedRow); + } catch (Exception e) { + LOG.error(String.format("serialize error, raw data: %s", consumedRow.toString()), e); + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(consumedRow) + .setDirtyType(dirtyType) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } + } + return value; + } + + private void serializeWithDirtyHandle(Map<String, Object> baseMap, JsonNode rootNode, + JsonNode dataNode, List<ProducerRecord<byte[], byte[]>> values) { + try { + byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap); + values.add(new ProducerRecord<>( + jsonDynamicSchemaFormat.parse(rootNode, topicPattern), + extractPartition(null, null, data), null, data)); + } catch (Exception e) { + LOG.error(String.format("serialize error, raw data: %s", baseMap), e); + if (!dirtyOptions.ignoreDirty()) { + throw new RuntimeException(e); + } + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(dataNode) + .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR) + .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) + .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) + .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } + } + } + /** * Serialize for list it is used for multiple sink scenes when a record contains mulitple real records. * @@ -195,10 +280,14 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat * @return List of ProducerRecord */ public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData consumedRow, @Nullable Long timestamp) { + List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>(); if (!multipleSink) { - return Collections.singletonList(serialize(consumedRow, timestamp)); + ProducerRecord<byte[], byte[]> value = serialize(consumedRow, timestamp); + if (value != null) { + values.add(value); + } + return values; } - List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>(); try { JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0)); boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode); @@ -220,9 +309,27 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat } else { split2JsonArray(rootNode, updateBeforeNode, updateAfterNode, values); } - } catch (IOException e) { - LOG.warn("deserialize error", e); - values.add(new ProducerRecord<>(topic, null, null, consumedRow.getBinary(0))); + } catch (Exception e) { + LOG.error(String.format("serialize error, raw data: %s", new String(consumedRow.getBinary(0))), e); + if (!dirtyOptions.ignoreDirty()) { + throw new RuntimeException(e); + } + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(new String(consumedRow.getBinary(0))) + .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } } return values; } @@ -261,27 +368,13 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat } else if (updateBeforeKey != null) { baseMap.remove(updateBeforeKey); } - try { - byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap); - values.add(new ProducerRecord<>( - jsonDynamicSchemaFormat.parse(rootNode, topicPattern), - extractPartition(null, null, data), null, data)); - } catch (Exception e) { - throw new RuntimeException("serialize for list error", e); - } + serializeWithDirtyHandle(baseMap, rootNode, updateAfterNode.get(i), values); } } else { // In general, it will not run to this branch for (int i = 0; i < updateBeforeNode.size(); i++) { baseMap.put(updateBeforeKey, Collections.singletonList(updateBeforeNode.get(i))); - try { - byte[] data = jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap); - values.add(new ProducerRecord<>( - jsonDynamicSchemaFormat.parse(rootNode, topicPattern), - extractPartition(null, null, data), null, data)); - } catch (Exception e) { - throw new RuntimeException("serialize for list error", e); - } + serializeWithDirtyHandle(baseMap, rootNode, updateBeforeNode.get(i), values); } } } @@ -308,7 +401,7 @@ class DynamicKafkaSerializationSchema implements KafkaSerializationSchema<RowDat // Extract the index '0' as the raw data is determined by the Raw format: // The Raw format allows to read and write raw (byte based) values as a single column return jsonDynamicSchemaFormat.parse(element.getBinary(0), topicPattern); - } catch (IOException e) { + } catch (Exception e) { // Ignore the parse error and it will return the default topic final. LOG.warn("parse dynamic topic error", e); } diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java index acfa5d8f4..63a1593f9 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java @@ -37,6 +37,8 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.kafka.DynamicKafkaSerializationSchema.MetadataConverter; import org.apache.kafka.common.header.Header; import org.slf4j.Logger; @@ -145,6 +147,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada */ private final String auditHostAndPorts; private @Nullable final String sinkMultipleFormat; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; /** * Metadata that is appended at the end of a physical sink row. */ @@ -176,7 +180,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada String inlongMetric, String auditHostAndPorts, @Nullable String sinkMultipleFormat, - @Nullable String topicPattern) { + @Nullable String topicPattern, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { // Format attributes this.consumedDataType = checkNotNull(consumedDataType, "Consumed data type must not be null."); @@ -207,6 +213,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada this.auditHostAndPorts = auditHostAndPorts; this.sinkMultipleFormat = sinkMultipleFormat; this.topicPattern = topicPattern; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -310,7 +318,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada inlongMetric, auditHostAndPorts, sinkMultipleFormat, - topicPattern); + topicPattern, + dirtyOptions, + dirtySink); copy.metadataKeys = metadataKeys; return copy; } @@ -422,7 +432,9 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada metadataPositions, upsertMode, sinkMultipleFormat, - topicPattern); + topicPattern, + dirtyOptions, + dirtySink); return new FlinkKafkaProducer<>( topic, diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java index dc218c900..5dd3fcaf7 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java @@ -20,6 +20,7 @@ package org.apache.inlong.sort.kafka.table; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource; import org.apache.flink.table.data.GenericRowData; @@ -28,10 +29,17 @@ import org.apache.flink.types.DeserializationException; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -43,6 +51,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(DynamicKafkaDeserializationSchema.class); + private final @Nullable DeserializationSchema<RowData> keyDeserialization; private final DeserializationSchema<RowData> valueDeserialization; @@ -57,6 +67,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc private final boolean upsertMode; + private final DirtyOptions dirtyOptions; + private final @Nullable DirtySink<String> dirtySink; private SourceMetricData metricData; DynamicKafkaDeserializationSchema( @@ -68,7 +80,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc boolean hasMetadata, MetadataConverter[] metadataConverters, TypeInformation<RowData> producedTypeInfo, - boolean upsertMode) { + boolean upsertMode, + DirtyOptions dirtyOptions, + @Nullable DirtySink<String> dirtySink) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -87,6 +101,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc upsertMode); this.producedTypeInfo = producedTypeInfo; this.upsertMode = upsertMode; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } public void setMetricData(SourceMetricData metricData) { @@ -99,6 +115,9 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc keyDeserialization.open(context); } valueDeserialization.open(context); + if (dirtySink != null) { + dirtySink.open(new Configuration()); + } } @Override @@ -117,7 +136,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc // shortcut in case no output projection is required, // also not for a cartesian product with the keys if (keyDeserialization == null && !hasMetadata) { - valueDeserialization.deserialize(record.value(), collector); + deserializeWithDirtyHandle(record.value(), DirtyType.VALUE_DESERIALIZE_ERROR, + valueDeserialization, collector); // output metrics if (metricData != null) { outputMetrics(record); @@ -127,7 +147,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc // buffer key(s) if (keyDeserialization != null) { - keyDeserialization.deserialize(record.key(), keyCollector); + deserializeWithDirtyHandle(record.key(), DirtyType.KEY_DESERIALIZE_ERROR, + keyDeserialization, keyCollector); } // project output while emitting values @@ -139,7 +160,8 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc // collect tombstone messages in upsert mode by hand outputCollector.collect(null); } else { - valueDeserialization.deserialize(record.value(), outputCollector); + deserializeWithDirtyHandle(record.value(), DirtyType.VALUE_DESERIALIZE_ERROR, + valueDeserialization, outputCollector); // output metrics if (metricData != null) { outputMetrics(record); @@ -149,6 +171,35 @@ public class DynamicKafkaDeserializationSchema implements KafkaDeserializationSc keyCollector.buffer.clear(); } + private void deserializeWithDirtyHandle(byte[] value, DirtyType dirtyType, + DeserializationSchema<RowData> deserialization, Collector<RowData> collector) throws IOException { + if (!dirtyOptions.ignoreDirty()) { + deserialization.deserialize(value, collector); + } else { + try { + deserialization.deserialize(value, collector); + } catch (IOException e) { + LOG.error(String.format("deserialize error, raw data: %s", new String(value)), e); + if (dirtySink != null) { + DirtyData.Builder<String> builder = DirtyData.builder(); + try { + builder.setData(new String(value)) + .setDirtyType(dirtyType) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setIdentifier(dirtyOptions.getIdentifier()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new IOException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } + } + } + } + private void outputMetrics(ConsumerRecord<byte[], byte[]> record) { long dataSize = record.value() == null ? 0L : record.value().length; if (metricData != null) { diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java index 1d5932968..7525ffae1 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java @@ -43,6 +43,8 @@ import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; import org.apache.flink.util.Preconditions; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.kafka.FlinkKafkaConsumer; import org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -51,7 +53,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -178,6 +179,9 @@ public class KafkaDynamicSource protected final String auditHostAndPorts; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<String> dirtySink; + private static final ObjectMapper MAPPER = new ObjectMapper(); private static final Logger LOG = LoggerFactory.getLogger(KafkaDynamicSource.class); @@ -197,7 +201,9 @@ public class KafkaDynamicSource long startupTimestampMillis, boolean upsertMode, final String inlongMetric, - final String auditHostAndPorts) { + final String auditHostAndPorts, + DirtyOptions dirtyOptions, + @Nullable DirtySink<String> dirtySink) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -232,6 +238,8 @@ public class KafkaDynamicSource this.upsertMode = upsertMode; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -322,7 +330,11 @@ public class KafkaDynamicSource startupMode, specificStartupOffsets, startupTimestampMillis, - upsertMode, inlongMetric, auditHostAndPorts); + upsertMode, + inlongMetric, + auditHostAndPorts, + dirtyOptions, + dirtySink); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -427,7 +439,9 @@ public class KafkaDynamicSource hasMetadata, metadataConverters, producedTypeInfo, - upsertMode); + upsertMode, + dirtyOptions, + dirtySink); final FlinkKafkaConsumer<RowData> kafkaConsumer; if (topics != null) { diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java index 5f9debe1d..b2e563193 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java @@ -51,6 +51,9 @@ import org.apache.flink.table.formats.raw.RawFormatSerializationSchema; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.VarBinaryType; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.kafka.KafkaDynamicSink; import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner; @@ -95,6 +98,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions; import static org.apache.flink.table.factories.FactoryUtil.FORMAT; import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; @@ -322,7 +326,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna final DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat = getValueDecodingFormat(helper); - helper.validateExcept(PROPERTIES_PREFIX); + helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX); validateTableSourceOptions(tableOptions); @@ -354,7 +358,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); - + // Build the dirty data side-output + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink<String> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -369,7 +375,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna startupOptions.specificOffsets, startupOptions.startupTimestampMillis, inlongMetric, - auditHostAndPorts); + auditHostAndPorts, + dirtyOptions, + dirtySink); } @Override @@ -387,7 +395,7 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna getValueEncodingFormat(helper); final String sinkMultipleFormat = getSinkMultipleFormat(helper); - helper.validateExcept(PROPERTIES_PREFIX); + helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX); validateSinkPartitioner(tableOptions); validateSinkSemantic(tableOptions); @@ -411,7 +419,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna final String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); - + // Build the dirty data side-output + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); return createKafkaTableSink( physicalDataType, keyEncodingFormat.orElse(null), @@ -428,13 +438,14 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna inlongMetric, auditHostAndPorts, sinkMultipleFormat, - tableOptions.getOptional(TOPIC_PATTERN).orElse(null)); + tableOptions.getOptional(TOPIC_PATTERN).orElse(null), + dirtyOptions, + dirtySink); } private void validateSinkMultipleFormatAndPhysicalDataType(DataType physicalDataType, EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, String sinkMultipleFormat) { - if (valueEncodingFormat instanceof RawFormatSerializationSchema - && StringUtils.isNotBlank(sinkMultipleFormat)) { + if (multipleSink(valueEncodingFormat, sinkMultipleFormat)) { DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); Set<String> supportFormats = DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet(); if (!supportFormats.contains(sinkMultipleFormat)) { @@ -451,6 +462,12 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna } } + private boolean multipleSink(EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, + String sinkMultipleFormat) { + return valueEncodingFormat instanceof RawFormatSerializationSchema + && StringUtils.isNotBlank(sinkMultipleFormat); + } + // -------------------------------------------------------------------------------------------- protected KafkaDynamicSource createKafkaTableSource( @@ -467,7 +484,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna Map<KafkaTopicPartition, Long> specificStartupOffsets, long startupTimestampMillis, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + DirtyOptions dirtyOptions, + @Nullable DirtySink<String> dirtySink) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -483,7 +502,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna startupTimestampMillis, false, inlongMetric, - auditHostAndPorts); + auditHostAndPorts, + dirtyOptions, + dirtySink); } protected KafkaDynamicSink createKafkaTableSink( @@ -502,7 +523,9 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna String inlongMetric, String auditHostAndPorts, @Nullable String sinkMultipleFormat, - @Nullable String topicPattern) { + @Nullable String topicPattern, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { return new KafkaDynamicSink( physicalDataType, physicalDataType, @@ -522,6 +545,8 @@ public class KafkaDynamicTableFactory implements DynamicTableSourceFactory, Dyna inlongMetric, auditHostAndPorts, sinkMultipleFormat, - topicPattern); + topicPattern, + dirtyOptions, + dirtySink); } } \ No newline at end of file diff --git a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java index b53565357..a5da814d0 100644 --- a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -45,6 +45,9 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.SerializationFormatFactory; import org.apache.flink.table.types.DataType; import org.apache.flink.types.RowKind; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import org.apache.inlong.sort.kafka.KafkaDynamicSink; import java.time.Duration; @@ -67,6 +70,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.aut import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection; import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection; import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties; +import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; import static org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG; @@ -200,7 +204,7 @@ public class UpsertKafkaDynamicTableFactory helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT); // Validate the option data type. - helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX); + helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX); TableSchema schema = context.getCatalogTable().getSchema(); validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, schema); @@ -210,7 +214,9 @@ public class UpsertKafkaDynamicTableFactory Properties properties = getKafkaProperties(context.getCatalogTable().getOptions()); // always use earliest to keep data integrity StartupMode earliest = StartupMode.EARLIEST; - + // Build the dirty data side-output + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink<String> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); return new KafkaDynamicSource( schema.toPhysicalRowDataType(), keyDecodingFormat, @@ -224,7 +230,11 @@ public class UpsertKafkaDynamicTableFactory earliest, Collections.emptyMap(), 0, - true, null, null); + true, + null, + null, + dirtyOptions, + dirtySink); } @Override @@ -241,7 +251,7 @@ public class UpsertKafkaDynamicTableFactory helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT); // Validate the option data type. - helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX); + helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX); TableSchema schema = context.getCatalogTable().getSchema(); validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, schema); @@ -258,6 +268,9 @@ public class UpsertKafkaDynamicTableFactory new SinkBufferFlushMode(batchSize, batchInterval.toMillis()); String inlongMetric = tableOptions.getOptional(INLONG_METRIC).orElse(null); final String auditHostAndPorts = tableOptions.getOptional(INLONG_AUDIT).orElse(null); + // Build the dirty data side-output + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(tableOptions); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); // use {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner}. // it will use hash partition if key is set else in round-robin behaviour. @@ -280,7 +293,9 @@ public class UpsertKafkaDynamicTableFactory inlongMetric, auditHostAndPorts, null, - null); + null, + dirtyOptions, + dirtySink); } private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable catalogTable) { diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java index 18b39eb57..52e3d6f6b 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java @@ -22,6 +22,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.inlong.sort.formats.common.IntFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; import org.apache.inlong.sort.formats.common.StringFormatInfo; import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo; import org.apache.inlong.sort.parser.impl.FlinkSqlParser; @@ -45,6 +47,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -109,6 +112,97 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase { "raw-hash", pattern); } + private KafkaExtractNode buildDirtyKafkaExtractNode() { + List<FieldInfo> fields = Arrays.asList( + new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("name", new StringFormatInfo())); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student"); + JsonFormat jsonFormat = new JsonFormat(); + jsonFormat.setIgnoreParseErrors(false); + return new KafkaExtractNode("1", "kafka_input", fields, + null, properties, "topic_dirty_input", "localhost:9092", + jsonFormat, KafkaScanStartupMode.EARLIEST_OFFSET, null, + "test_group", null, null); + } + + private KafkaLoadNode buildDirtyKafkaLoadNode() { + List<FieldInfo> fields = Arrays.asList( + new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("name", new StringFormatInfo())); + List<FieldRelation> relations = Arrays.asList( + new FieldRelation(new FieldInfo("id", new LongFormatInfo()), + new FieldInfo("id", new LongFormatInfo())), + new FieldRelation(new FieldInfo("age", new IntFormatInfo()), + new FieldInfo("age", new IntFormatInfo())), + new FieldRelation(new FieldInfo("name", new StringFormatInfo()), + new FieldInfo("name", new StringFormatInfo()))); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "s3"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student"); + properties.put("dirty.side-output.s3.bucket", "s3-test-bucket"); + properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint"); + properties.put("dirty.side-output.s3.key", "dirty/test"); + properties.put("dirty.side-output.s3.region", "region"); + properties.put("dirty.side-output.s3.access-key-id", "access_key_id"); + properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id"); + properties.put("dirty.identifier", "inlong-student-${SYSTEM_TIME}"); + return new KafkaLoadNode("2", "kafka_output", fields, relations, null, + null, "topic_dirty_output", "localhost:9092", new JsonFormat(), + null, properties, null); + } + + private KafkaExtractNode buildDirtyKafkaExtractNodeWithRawFormat() { + List<FieldInfo> fields = Collections.singletonList(new FieldInfo("raw", new VarBinaryFormatInfo())); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student"); + return new KafkaExtractNode("1", "kafka_input", fields, + null, properties, "topic_dirty_input", "localhost:9092", + new RawFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, + "test_group", null, null); + } + + private KafkaLoadNode buildDirtyKafkaLoadNodeWithDynamicTopic() { + List<FieldInfo> fields = Collections.singletonList(new FieldInfo("raw", new VarBinaryFormatInfo())); + List<FieldRelation> relations = Collections.singletonList( + new FieldRelation(new FieldInfo("raw", new StringFormatInfo()), + new FieldInfo("raw", new StringFormatInfo()))); + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=${database}&table=${table}"); + properties.put("dirty.identifier", "${database}-${table}-${SYSTEM_TIME}"); + properties.put("dirty.side-output.s3.bucket", "s3-test-bucket"); + properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint"); + properties.put("dirty.side-output.s3.key", "dirty/test"); + properties.put("dirty.side-output.s3.region", "region"); + properties.put("dirty.side-output.s3.access-key-id", "access_key_id"); + properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id"); + return new KafkaLoadNode("2", "kafka_output", fields, relations, null, + null, "topic_dirty_output", "localhost:9092", new RawFormat(), + null, properties, null, new CanalJsonFormat(), "topic_dirty_output", + null, null); + } + /** * build node relation * @@ -233,4 +327,60 @@ public class KafkaLoadSqlParseTest extends AbstractTestBase { ParseResult result = parser.parse(); Assert.assertTrue(result.tryExecute()); } + + /** + * Test dirty handle of kafka source and kafka sink + * In this part it uses 'log' side-output for kafka source and 's3' side-output for kafka sink + * @throws Exception The exception may be thrown when executing + */ + @Test + public void testKafkaDirtyHandleSqlParse() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + env.disableOperatorChaining(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildDirtyKafkaExtractNode(); + Node outputNode = buildDirtyKafkaLoadNode(); + StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), + Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + ParseResult result = parser.parse(); + Assert.assertTrue(result.tryExecute()); + } + + /** + * Test dirty handle of kafka source and kafka sink with dynamic topic + * In this part it uses 'log' side-output for kafka source and 's3' side-output for kafka sink + * @throws Exception The exception may be thrown when executing + */ + @Test + public void testKafkaDirtyHandleWithDynamicTopicSqlParse() throws Exception { + EnvironmentSettings settings = EnvironmentSettings + .newInstance() + .useBlinkPlanner() + .inStreamingMode() + .build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(10000); + env.disableOperatorChaining(); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); + Node inputNode = buildDirtyKafkaExtractNodeWithRawFormat(); + Node outputNode = buildDirtyKafkaLoadNodeWithDynamicTopic(); + StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), + Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode), + Collections.singletonList(outputNode)))); + GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo)); + FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo); + ParseResult result = parser.parse(); + Assert.assertTrue(result.tryExecute()); + } }