This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 09bae48fe [INLONG-6671][Sort] Supports dirty data side-output for 
kafka connector (#6688)
09bae48fe is described below

commit 09bae48fe06662bc8cd795a9272573a7ccd9135a
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Thu Dec 1 14:47:54 2022 +0800

    [INLONG-6671][Sort] Supports dirty data side-output for kafka connector 
(#6688)
---
 .../base/dirty/sink/log/LogDirtySinkFactory.java   |   9 +-
 .../base/dirty/sink/s3/S3DirtySinkFactory.java     |   8 +-
 .../base/dirty/utils/DirtySinkFactoryUtils.java    |  48 ++++++
 .../kafka/DynamicKafkaSerializationSchema.java     | 161 ++++++++++++++++-----
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java |  18 ++-
 .../table/DynamicKafkaDeserializationSchema.java   |  59 +++++++-
 .../sort/kafka/table/KafkaDynamicSource.java       |  22 ++-
 .../sort/kafka/table/KafkaDynamicTableFactory.java |  49 +++++--
 .../table/UpsertKafkaDynamicTableFactory.java      |  25 +++-
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  | 150 +++++++++++++++++++
 10 files changed, 481 insertions(+), 68 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
index c3720a93d..b07b581f1 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySinkFactory.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.base.dirty.sink.log;
 
 import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
@@ -38,9 +40,10 @@ public class LogDirtySinkFactory implements DirtySinkFactory 
{
 
     @Override
     public <T> DirtySink<T> createDirtySink(Context context) {
-        FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
-        String format = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FORMAT);
-        String fieldDelimiter = 
context.getConfiguration().get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
+        ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+        FactoryUtil.validateFactoryOptions(this, config);
+        String format = config.get(DIRTY_SIDE_OUTPUT_FORMAT);
+        String fieldDelimiter = config.get(DIRTY_SIDE_OUTPUT_FIELD_DELIMITER);
         return new LogDirtySink<>(format, fieldDelimiter,
                 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
     }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
index d9ec26434..16310926c 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.base.dirty.sink.s3;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.factories.DynamicTableFactory.Context;
@@ -79,9 +80,10 @@ public class S3DirtySinkFactory implements DirtySinkFactory {
 
     @Override
     public <T> DirtySink<T> createDirtySink(Context context) {
-        FactoryUtil.validateFactoryOptions(this, context.getConfiguration());
-        validate(context.getConfiguration());
-        return new S3DirtySink<>(getS3Options(context.getConfiguration()),
+        ReadableConfig config = 
Configuration.fromMap(context.getCatalogTable().getOptions());
+        FactoryUtil.validateFactoryOptions(this, config);
+        validate(config);
+        return new S3DirtySink<>(getS3Options(config),
                 
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType());
     }
 
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
new file mode 100644
index 000000000..8f5fc9ff4
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/DirtySinkFactoryUtils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty.utils;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.sink.DirtySinkFactory;
+
+/**
+ * Dirty sink facotry utils, it helps to create dirty sink
+ */
+public final class DirtySinkFactoryUtils {
+
+    private DirtySinkFactoryUtils() {
+    }
+
+    public static <T> DirtySink<T> createDirtySink(Context context, 
DirtyOptions dirtyOptions) {
+        if (dirtyOptions == null) {
+            dirtyOptions = 
DirtyOptions.fromConfig(Configuration.fromMap(context.getCatalogTable().getOptions()));
+        }
+        dirtyOptions.validate();
+        DirtySink<T> dirtySink = null;
+        if (dirtyOptions.ignoreDirty() && 
dirtyOptions.enableDirtySideOutput()) {
+            DirtySinkFactory dirtySinkFactory = 
FactoryUtil.discoverFactory(context.getClassLoader(),
+                    DirtySinkFactory.class, dirtyOptions.getDirtyConnector());
+            dirtySink = dirtySinkFactory.createDirtySink(context);
+        }
+        return dirtySink;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
index be9c48480..6b0b733a9 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.kafka;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.connectors.kafka.KafkaContextAware;
 import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
@@ -28,6 +29,10 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink.WritableMetadata;
@@ -36,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -80,6 +84,8 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
     private final String sinkMultipleFormat;
     private boolean multipleSink;
     private JsonDynamicSchemaFormat jsonDynamicSchemaFormat;
+    private final DirtyOptions dirtyOptions;
+    private final @Nullable DirtySink<Object> dirtySink;
     private int[] partitions;
 
     private int parallelInstanceId;
@@ -97,7 +103,9 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
             int[] metadataPositions,
             boolean upsertMode,
             @Nullable String sinkMultipleFormat,
-            @Nullable String topicPattern) {
+            @Nullable String topicPattern,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keySerialization != null && keyFieldGetters.length > 0,
@@ -114,6 +122,8 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
         this.upsertMode = upsertMode;
         this.sinkMultipleFormat = sinkMultipleFormat;
         this.topicPattern = topicPattern;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     static RowData createProjectedRow(
@@ -135,9 +145,12 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
         if (partitioner != null) {
             partitioner.open(parallelInstanceId, numParallelInstances);
         }
+        if (dirtySink != null) {
+            dirtySink.open(new Configuration());
+        }
         // Only support dynamic topic when the topicPattern is specified
         // and the valueSerialization is RawFormatSerializationSchema
-        if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(topicPattern)) {
+        if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(sinkMultipleFormat)) {
             multipleSink = true;
             jsonDynamicSchemaFormat =
                     (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
@@ -148,19 +161,24 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
     public ProducerRecord<byte[], byte[]> serialize(RowData consumedRow, 
@Nullable Long timestamp) {
         // shortcut in case no input projection is required
         if (keySerialization == null && !hasMetadata) {
-            final byte[] valueSerialized = 
valueSerialization.serialize(consumedRow);
-            return new ProducerRecord<>(
-                    getTargetTopic(consumedRow),
-                    extractPartition(consumedRow, null, valueSerialized),
-                    null,
-                    valueSerialized);
+            final byte[] valueSerialized = 
serializeWithDirtyHandle(consumedRow,
+                    DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+            if (valueSerialized != null) {
+                return new ProducerRecord<>(
+                        getTargetTopic(consumedRow),
+                        extractPartition(consumedRow, null, valueSerialized),
+                        null,
+                        valueSerialized);
+            }
         }
         final byte[] keySerialized;
+        boolean mayDirtyData = false;
         if (keySerialization == null) {
             keySerialized = null;
         } else {
             final RowData keyRow = createProjectedRow(consumedRow, 
RowKind.INSERT, keyFieldGetters);
-            keySerialized = keySerialization.serialize(keyRow);
+            keySerialized = serializeWithDirtyHandle(keyRow, 
DirtyType.KEY_SERIALIZE_ERROR, keySerialization);
+            mayDirtyData = keySerialized == null;
         }
 
         final byte[] valueSerialized;
@@ -173,10 +191,16 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
             } else {
                 // make the message to be INSERT to be compliant with the 
INSERT-ONLY format
                 valueRow.setRowKind(RowKind.INSERT);
-                valueSerialized = valueSerialization.serialize(valueRow);
+                valueSerialized = serializeWithDirtyHandle(valueRow,
+                        DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+                mayDirtyData = mayDirtyData || valueSerialized == null;
             }
         } else {
-            valueSerialized = valueSerialization.serialize(valueRow);
+            valueSerialized = serializeWithDirtyHandle(valueRow, 
DirtyType.VALUE_SERIALIZE_ERROR, valueSerialization);
+            mayDirtyData = mayDirtyData || valueSerialized == null;
+        }
+        if (mayDirtyData) {
+            return null;
         }
         return new ProducerRecord<>(
                 getTargetTopic(consumedRow),
@@ -187,6 +211,67 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
                 readMetadata(consumedRow, 
KafkaDynamicSink.WritableMetadata.HEADERS));
     }
 
+    private byte[] serializeWithDirtyHandle(RowData consumedRow, DirtyType 
dirtyType,
+            SerializationSchema<RowData> serialization) {
+        if (!dirtyOptions.ignoreDirty()) {
+            return serialization.serialize(consumedRow);
+        }
+        byte[] value = null;
+        try {
+            value = serialization.serialize(consumedRow);
+        } catch (Exception e) {
+            LOG.error(String.format("serialize error, raw data: %s", 
consumedRow.toString()), e);
+            if (dirtySink != null) {
+                DirtyData.Builder<Object> builder = DirtyData.builder();
+                try {
+                    builder.setData(consumedRow)
+                            .setDirtyType(dirtyType)
+                            .setLabels(dirtyOptions.getLabels())
+                            .setLogTag(dirtyOptions.getLogTag())
+                            .setIdentifier(dirtyOptions.getIdentifier());
+                    dirtySink.invoke(builder.build());
+                } catch (Exception ex) {
+                    if (!dirtyOptions.ignoreSideOutputErrors()) {
+                        throw new RuntimeException(ex);
+                    }
+                    LOG.warn("Dirty sink failed", ex);
+                }
+            }
+        }
+        return value;
+    }
+
+    private void serializeWithDirtyHandle(Map<String, Object> baseMap, 
JsonNode rootNode,
+            JsonNode dataNode, List<ProducerRecord<byte[], byte[]>> values) {
+        try {
+            byte[] data = 
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
+            values.add(new ProducerRecord<>(
+                    jsonDynamicSchemaFormat.parse(rootNode, topicPattern),
+                    extractPartition(null, null, data), null, data));
+        } catch (Exception e) {
+            LOG.error(String.format("serialize error, raw data: %s", baseMap), 
e);
+            if (!dirtyOptions.ignoreDirty()) {
+                throw new RuntimeException(e);
+            }
+            if (dirtySink != null) {
+                DirtyData.Builder<Object> builder = DirtyData.builder();
+                try {
+                    builder.setData(dataNode)
+                            .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR)
+                            .setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
+                            .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
+                            
.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
+                    dirtySink.invoke(builder.build());
+                } catch (Exception ex) {
+                    if (!dirtyOptions.ignoreSideOutputErrors()) {
+                        throw new RuntimeException(ex);
+                    }
+                    LOG.warn("Dirty sink failed", ex);
+                }
+            }
+        }
+    }
+
     /**
      * Serialize for list it is used for multiple sink scenes when a record 
contains mulitple real records.
      *
@@ -195,10 +280,14 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
      * @return List of ProducerRecord
      */
     public List<ProducerRecord<byte[], byte[]>> serializeForList(RowData 
consumedRow, @Nullable Long timestamp) {
+        List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
         if (!multipleSink) {
-            return Collections.singletonList(serialize(consumedRow, 
timestamp));
+            ProducerRecord<byte[], byte[]> value = serialize(consumedRow, 
timestamp);
+            if (value != null) {
+                values.add(value);
+            }
+            return values;
         }
-        List<ProducerRecord<byte[], byte[]>> values = new ArrayList<>();
         try {
             JsonNode rootNode = 
jsonDynamicSchemaFormat.deserialize(consumedRow.getBinary(0));
             boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode);
@@ -220,9 +309,27 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
             } else {
                 split2JsonArray(rootNode, updateBeforeNode, updateAfterNode, 
values);
             }
-        } catch (IOException e) {
-            LOG.warn("deserialize error", e);
-            values.add(new ProducerRecord<>(topic, null, null, 
consumedRow.getBinary(0)));
+        } catch (Exception e) {
+            LOG.error(String.format("serialize error, raw data: %s", new 
String(consumedRow.getBinary(0))), e);
+            if (!dirtyOptions.ignoreDirty()) {
+                throw new RuntimeException(e);
+            }
+            if (dirtySink != null) {
+                DirtyData.Builder<Object> builder = DirtyData.builder();
+                try {
+                    builder.setData(new String(consumedRow.getBinary(0)))
+                            .setDirtyType(DirtyType.VALUE_DESERIALIZE_ERROR)
+                            .setLabels(dirtyOptions.getLabels())
+                            .setLogTag(dirtyOptions.getLogTag())
+                            .setIdentifier(dirtyOptions.getIdentifier());
+                    dirtySink.invoke(builder.build());
+                } catch (Exception ex) {
+                    if (!dirtyOptions.ignoreSideOutputErrors()) {
+                        throw new RuntimeException(ex);
+                    }
+                    LOG.warn("Dirty sink failed", ex);
+                }
+            }
         }
         return values;
     }
@@ -261,27 +368,13 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
                 } else if (updateBeforeKey != null) {
                     baseMap.remove(updateBeforeKey);
                 }
-                try {
-                    byte[] data = 
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
-                    values.add(new ProducerRecord<>(
-                            jsonDynamicSchemaFormat.parse(rootNode, 
topicPattern),
-                            extractPartition(null, null, data), null, data));
-                } catch (Exception e) {
-                    throw new RuntimeException("serialize for list error", e);
-                }
+                serializeWithDirtyHandle(baseMap, rootNode, 
updateAfterNode.get(i), values);
             }
         } else {
             // In general, it will not run to this branch
             for (int i = 0; i < updateBeforeNode.size(); i++) {
                 baseMap.put(updateBeforeKey, 
Collections.singletonList(updateBeforeNode.get(i)));
-                try {
-                    byte[] data = 
jsonDynamicSchemaFormat.objectMapper.writeValueAsBytes(baseMap);
-                    values.add(new ProducerRecord<>(
-                            jsonDynamicSchemaFormat.parse(rootNode, 
topicPattern),
-                            extractPartition(null, null, data), null, data));
-                } catch (Exception e) {
-                    throw new RuntimeException("serialize for list error", e);
-                }
+                serializeWithDirtyHandle(baseMap, rootNode, 
updateBeforeNode.get(i), values);
             }
         }
     }
@@ -308,7 +401,7 @@ class DynamicKafkaSerializationSchema implements 
KafkaSerializationSchema<RowDat
                 // Extract the index '0' as the raw data is determined by the 
Raw format:
                 // The Raw format allows to read and write raw (byte based) 
values as a single column
                 return jsonDynamicSchemaFormat.parse(element.getBinary(0), 
topicPattern);
-            } catch (IOException e) {
+            } catch (Exception e) {
                 // Ignore the parse error and it will return the default topic 
final.
                 LOG.warn("parse dynamic topic error", e);
             }
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
index acfa5d8f4..63a1593f9 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java
@@ -37,6 +37,8 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import 
org.apache.inlong.sort.kafka.DynamicKafkaSerializationSchema.MetadataConverter;
 import org.apache.kafka.common.header.Header;
 import org.slf4j.Logger;
@@ -145,6 +147,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
      */
     private final String auditHostAndPorts;
     private @Nullable final String sinkMultipleFormat;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
     /**
      * Metadata that is appended at the end of a physical sink row.
      */
@@ -176,7 +180,9 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
             String inlongMetric,
             String auditHostAndPorts,
             @Nullable String sinkMultipleFormat,
-            @Nullable String topicPattern) {
+            @Nullable String topicPattern,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         // Format attributes
         this.consumedDataType =
                 checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
@@ -207,6 +213,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
         this.auditHostAndPorts = auditHostAndPorts;
         this.sinkMultipleFormat = sinkMultipleFormat;
         this.topicPattern = topicPattern;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -310,7 +318,9 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         inlongMetric,
                         auditHostAndPorts,
                         sinkMultipleFormat,
-                        topicPattern);
+                        topicPattern,
+                        dirtyOptions,
+                        dirtySink);
         copy.metadataKeys = metadataKeys;
         return copy;
     }
@@ -422,7 +432,9 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         metadataPositions,
                         upsertMode,
                         sinkMultipleFormat,
-                        topicPattern);
+                        topicPattern,
+                        dirtyOptions,
+                        dirtySink);
 
         return new FlinkKafkaProducer<>(
                 topic,
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
index dc218c900..5dd3fcaf7 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/DynamicKafkaDeserializationSchema.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.kafka.table;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
 import org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource;
 import org.apache.flink.table.data.GenericRowData;
@@ -28,10 +29,17 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,6 +51,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicKafkaDeserializationSchema.class);
+
     private final @Nullable DeserializationSchema<RowData> keyDeserialization;
 
     private final DeserializationSchema<RowData> valueDeserialization;
@@ -57,6 +67,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
 
     private final boolean upsertMode;
 
+    private final DirtyOptions dirtyOptions;
+    private final @Nullable DirtySink<String> dirtySink;
     private SourceMetricData metricData;
 
     DynamicKafkaDeserializationSchema(
@@ -68,7 +80,9 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
             boolean hasMetadata,
             MetadataConverter[] metadataConverters,
             TypeInformation<RowData> producedTypeInfo,
-            boolean upsertMode) {
+            boolean upsertMode,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<String> dirtySink) {
         if (upsertMode) {
             Preconditions.checkArgument(
                     keyDeserialization != null && keyProjection.length > 0,
@@ -87,6 +101,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
                         upsertMode);
         this.producedTypeInfo = producedTypeInfo;
         this.upsertMode = upsertMode;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     public void setMetricData(SourceMetricData metricData) {
@@ -99,6 +115,9 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
             keyDeserialization.open(context);
         }
         valueDeserialization.open(context);
+        if (dirtySink != null) {
+            dirtySink.open(new Configuration());
+        }
     }
 
     @Override
@@ -117,7 +136,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
         // shortcut in case no output projection is required,
         // also not for a cartesian product with the keys
         if (keyDeserialization == null && !hasMetadata) {
-            valueDeserialization.deserialize(record.value(), collector);
+            deserializeWithDirtyHandle(record.value(), 
DirtyType.VALUE_DESERIALIZE_ERROR,
+                    valueDeserialization, collector);
             // output metrics
             if (metricData != null) {
                 outputMetrics(record);
@@ -127,7 +147,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
 
         // buffer key(s)
         if (keyDeserialization != null) {
-            keyDeserialization.deserialize(record.key(), keyCollector);
+            deserializeWithDirtyHandle(record.key(), 
DirtyType.KEY_DESERIALIZE_ERROR,
+                    keyDeserialization, keyCollector);
         }
 
         // project output while emitting values
@@ -139,7 +160,8 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
             // collect tombstone messages in upsert mode by hand
             outputCollector.collect(null);
         } else {
-            valueDeserialization.deserialize(record.value(), outputCollector);
+            deserializeWithDirtyHandle(record.value(), 
DirtyType.VALUE_DESERIALIZE_ERROR,
+                    valueDeserialization, outputCollector);
             // output metrics
             if (metricData != null) {
                 outputMetrics(record);
@@ -149,6 +171,35 @@ public class DynamicKafkaDeserializationSchema implements 
KafkaDeserializationSc
         keyCollector.buffer.clear();
     }
 
+    private void deserializeWithDirtyHandle(byte[] value, DirtyType dirtyType,
+            DeserializationSchema<RowData> deserialization, Collector<RowData> 
collector) throws IOException {
+        if (!dirtyOptions.ignoreDirty()) {
+            deserialization.deserialize(value, collector);
+        } else {
+            try {
+                deserialization.deserialize(value, collector);
+            } catch (IOException e) {
+                LOG.error(String.format("deserialize error, raw data: %s", new 
String(value)), e);
+                if (dirtySink != null) {
+                    DirtyData.Builder<String> builder = DirtyData.builder();
+                    try {
+                        builder.setData(new String(value))
+                                .setDirtyType(dirtyType)
+                                .setLabels(dirtyOptions.getLabels())
+                                .setLogTag(dirtyOptions.getLogTag())
+                                .setIdentifier(dirtyOptions.getIdentifier());
+                        dirtySink.invoke(builder.build());
+                    } catch (Exception ex) {
+                        if (!dirtyOptions.ignoreSideOutputErrors()) {
+                            throw new IOException(ex);
+                        }
+                        LOG.warn("Dirty sink failed", ex);
+                    }
+                }
+            }
+        }
+    }
+
     private void outputMetrics(ConsumerRecord<byte[], byte[]> record) {
         long dataSize = record.value() == null ? 0L : record.value().length;
         if (metricData != null) {
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
index 1d5932968..7525ffae1 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicSource.java
@@ -43,6 +43,8 @@ import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.kafka.FlinkKafkaConsumer;
 import 
org.apache.inlong.sort.kafka.table.DynamicKafkaDeserializationSchema.MetadataConverter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -51,7 +53,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
-
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -178,6 +179,9 @@ public class KafkaDynamicSource
 
     protected final String auditHostAndPorts;
 
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<String> dirtySink;
+
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDynamicSource.class);
@@ -197,7 +201,9 @@ public class KafkaDynamicSource
             long startupTimestampMillis,
             boolean upsertMode,
             final String inlongMetric,
-            final String auditHostAndPorts) {
+            final String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<String> dirtySink) {
         // Format attributes
         this.physicalDataType =
                 Preconditions.checkNotNull(
@@ -232,6 +238,8 @@ public class KafkaDynamicSource
         this.upsertMode = upsertMode;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -322,7 +330,11 @@ public class KafkaDynamicSource
                         startupMode,
                         specificStartupOffsets,
                         startupTimestampMillis,
-                        upsertMode, inlongMetric, auditHostAndPorts);
+                        upsertMode,
+                        inlongMetric,
+                        auditHostAndPorts,
+                        dirtyOptions,
+                        dirtySink);
         copy.producedDataType = producedDataType;
         copy.metadataKeys = metadataKeys;
         copy.watermarkStrategy = watermarkStrategy;
@@ -427,7 +439,9 @@ public class KafkaDynamicSource
                         hasMetadata,
                         metadataConverters,
                         producedTypeInfo,
-                        upsertMode);
+                        upsertMode,
+                        dirtyOptions,
+                        dirtySink);
 
         final FlinkKafkaConsumer<RowData> kafkaConsumer;
         if (topics != null) {
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
index 5f9debe1d..b2e563193 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java
@@ -51,6 +51,9 @@ import 
org.apache.flink.table.formats.raw.RawFormatSerializationSchema;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.VarBinaryType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink;
 import org.apache.inlong.sort.kafka.partitioner.RawDataHashPartitioner;
@@ -95,6 +98,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.get
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableSourceOptions;
 import static org.apache.flink.table.factories.FactoryUtil.FORMAT;
 import static org.apache.flink.table.factories.FactoryUtil.SINK_PARALLELISM;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT;
@@ -322,7 +326,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final DecodingFormat<DeserializationSchema<RowData>> 
valueDecodingFormat =
                 getValueDecodingFormat(helper);
 
-        helper.validateExcept(PROPERTIES_PREFIX);
+        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
 
         validateTableSourceOptions(tableOptions);
 
@@ -354,7 +358,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
 
         final String auditHostAndPorts = 
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
-
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<String> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return createKafkaTableSource(
                 physicalDataType,
                 keyDecodingFormat.orElse(null),
@@ -369,7 +375,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 startupOptions.specificOffsets,
                 startupOptions.startupTimestampMillis,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                dirtyOptions,
+                dirtySink);
     }
 
     @Override
@@ -387,7 +395,7 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 getValueEncodingFormat(helper);
 
         final String sinkMultipleFormat = getSinkMultipleFormat(helper);
-        helper.validateExcept(PROPERTIES_PREFIX);
+        helper.validateExcept(PROPERTIES_PREFIX, DIRTY_PREFIX);
 
         validateSinkPartitioner(tableOptions);
         validateSinkSemantic(tableOptions);
@@ -411,7 +419,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         final String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
 
         final String auditHostAndPorts = 
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
-
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return createKafkaTableSink(
                 physicalDataType,
                 keyEncodingFormat.orElse(null),
@@ -428,13 +438,14 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 inlongMetric,
                 auditHostAndPorts,
                 sinkMultipleFormat,
-                tableOptions.getOptional(TOPIC_PATTERN).orElse(null));
+                tableOptions.getOptional(TOPIC_PATTERN).orElse(null),
+                dirtyOptions,
+                dirtySink);
     }
 
     private void validateSinkMultipleFormatAndPhysicalDataType(DataType 
physicalDataType,
             EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat, 
String sinkMultipleFormat) {
-        if (valueEncodingFormat instanceof RawFormatSerializationSchema
-                && StringUtils.isNotBlank(sinkMultipleFormat)) {
+        if (multipleSink(valueEncodingFormat, sinkMultipleFormat)) {
             DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
             Set<String> supportFormats = 
DynamicSchemaFormatFactory.SUPPORT_FORMATS.keySet();
             if (!supportFormats.contains(sinkMultipleFormat)) {
@@ -451,6 +462,12 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
         }
     }
 
+    private boolean multipleSink(EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat,
+            String sinkMultipleFormat) {
+        return valueEncodingFormat instanceof RawFormatSerializationSchema
+                && StringUtils.isNotBlank(sinkMultipleFormat);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     protected KafkaDynamicSource createKafkaTableSource(
@@ -467,7 +484,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             Map<KafkaTopicPartition, Long> specificStartupOffsets,
             long startupTimestampMillis,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<String> dirtySink) {
         return new KafkaDynamicSource(
                 physicalDataType,
                 keyDecodingFormat,
@@ -483,7 +502,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 startupTimestampMillis,
                 false,
                 inlongMetric,
-                auditHostAndPorts);
+                auditHostAndPorts,
+                dirtyOptions,
+                dirtySink);
     }
 
     protected KafkaDynamicSink createKafkaTableSink(
@@ -502,7 +523,9 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
             String inlongMetric,
             String auditHostAndPorts,
             @Nullable String sinkMultipleFormat,
-            @Nullable String topicPattern) {
+            @Nullable String topicPattern,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         return new KafkaDynamicSink(
                 physicalDataType,
                 physicalDataType,
@@ -522,6 +545,8 @@ public class KafkaDynamicTableFactory implements 
DynamicTableSourceFactory, Dyna
                 inlongMetric,
                 auditHostAndPorts,
                 sinkMultipleFormat,
-                topicPattern);
+                topicPattern,
+                dirtyOptions,
+                dirtySink);
     }
 }
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
index b53565357..a5da814d0 100644
--- 
a/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -45,6 +45,9 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.types.RowKind;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.kafka.KafkaDynamicSink;
 
 import java.time.Duration;
@@ -67,6 +70,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.aut
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createKeyFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.createValueFormatProjection;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.kafka.table.KafkaOptions.KAFKA_IGNORE_ALL_CHANGELOG;
@@ -200,7 +204,7 @@ public class UpsertKafkaDynamicTableFactory
                 
helper.discoverDecodingFormat(DeserializationFormatFactory.class, VALUE_FORMAT);
 
         // Validate the option data type.
-        helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+        helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX);
         TableSchema schema = context.getCatalogTable().getSchema();
         validateSource(tableOptions, keyDecodingFormat, valueDecodingFormat, 
schema);
 
@@ -210,7 +214,9 @@ public class UpsertKafkaDynamicTableFactory
         Properties properties = 
getKafkaProperties(context.getCatalogTable().getOptions());
         // always use earliest to keep data integrity
         StartupMode earliest = StartupMode.EARLIEST;
-
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<String> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         return new KafkaDynamicSource(
                 schema.toPhysicalRowDataType(),
                 keyDecodingFormat,
@@ -224,7 +230,11 @@ public class UpsertKafkaDynamicTableFactory
                 earliest,
                 Collections.emptyMap(),
                 0,
-                true, null, null);
+                true,
+                null,
+                null,
+                dirtyOptions,
+                dirtySink);
     }
 
     @Override
@@ -241,7 +251,7 @@ public class UpsertKafkaDynamicTableFactory
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, VALUE_FORMAT);
 
         // Validate the option data type.
-        helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
+        helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX, DIRTY_PREFIX);
         TableSchema schema = context.getCatalogTable().getSchema();
         validateSink(tableOptions, keyEncodingFormat, valueEncodingFormat, 
schema);
 
@@ -258,6 +268,9 @@ public class UpsertKafkaDynamicTableFactory
                 new SinkBufferFlushMode(batchSize, batchInterval.toMillis());
         String inlongMetric = 
tableOptions.getOptional(INLONG_METRIC).orElse(null);
         final String auditHostAndPorts = 
tableOptions.getOptional(INLONG_AUDIT).orElse(null);
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(tableOptions);
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
 
         // use {@link 
org.apache.kafka.clients.producer.internals.DefaultPartitioner}.
         // it will use hash partition if key is set else in round-robin 
behaviour.
@@ -280,7 +293,9 @@ public class UpsertKafkaDynamicTableFactory
                 inlongMetric,
                 auditHostAndPorts,
                 null,
-                null);
+                null,
+                dirtyOptions,
+                dirtySink);
     }
 
     private Tuple2<int[], int[]> createKeyValueProjections(CatalogTable 
catalogTable) {
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
index 18b39eb57..52e3d6f6b 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/KafkaLoadSqlParseTest.java
@@ -22,6 +22,8 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.formats.common.VarBinaryFormatInfo;
 import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
@@ -45,6 +47,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -109,6 +112,97 @@ public class KafkaLoadSqlParseTest extends 
AbstractTestBase {
                 "raw-hash", pattern);
     }
 
+    private KafkaExtractNode buildDirtyKafkaExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+        JsonFormat jsonFormat = new JsonFormat();
+        jsonFormat.setIgnoreParseErrors(false);
+        return new KafkaExtractNode("1", "kafka_input", fields,
+                null, properties, "topic_dirty_input", "localhost:9092",
+                jsonFormat, KafkaScanStartupMode.EARLIEST_OFFSET, null,
+                "test_group", null, null);
+    }
+
+    private KafkaLoadNode buildDirtyKafkaLoadNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("id", new LongFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("name", new StringFormatInfo()));
+        List<FieldRelation> relations = Arrays.asList(
+                new FieldRelation(new FieldInfo("id", new LongFormatInfo()),
+                        new FieldInfo("id", new LongFormatInfo())),
+                new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                        new FieldInfo("age", new IntFormatInfo())),
+                new FieldRelation(new FieldInfo("name", new 
StringFormatInfo()),
+                        new FieldInfo("name", new StringFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "s3");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+        properties.put("dirty.side-output.s3.bucket", "s3-test-bucket");
+        properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint");
+        properties.put("dirty.side-output.s3.key", "dirty/test");
+        properties.put("dirty.side-output.s3.region", "region");
+        properties.put("dirty.side-output.s3.access-key-id", "access_key_id");
+        properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id");
+        properties.put("dirty.identifier", "inlong-student-${SYSTEM_TIME}");
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+                null, "topic_dirty_output", "localhost:9092", new JsonFormat(),
+                null, properties, null);
+    }
+
+    private KafkaExtractNode buildDirtyKafkaExtractNodeWithRawFormat() {
+        List<FieldInfo> fields = Collections.singletonList(new 
FieldInfo("raw", new VarBinaryFormatInfo()));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=student");
+        return new KafkaExtractNode("1", "kafka_input", fields,
+                null, properties, "topic_dirty_input", "localhost:9092",
+                new RawFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null,
+                "test_group", null, null);
+    }
+
+    private KafkaLoadNode buildDirtyKafkaLoadNodeWithDynamicTopic() {
+        List<FieldInfo> fields = Collections.singletonList(new 
FieldInfo("raw", new VarBinaryFormatInfo()));
+        List<FieldRelation> relations = Collections.singletonList(
+                new FieldRelation(new FieldInfo("raw", new StringFormatInfo()),
+                        new FieldInfo("raw", new StringFormatInfo())));
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=${database}&table=${table}");
+        properties.put("dirty.identifier", 
"${database}-${table}-${SYSTEM_TIME}");
+        properties.put("dirty.side-output.s3.bucket", "s3-test-bucket");
+        properties.put("dirty.side-output.s3.endpoint", "s3.test.endpoint");
+        properties.put("dirty.side-output.s3.key", "dirty/test");
+        properties.put("dirty.side-output.s3.region", "region");
+        properties.put("dirty.side-output.s3.access-key-id", "access_key_id");
+        properties.put("dirty.side-output.s3.secret-key-id", "secret_key_id");
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null,
+                null, "topic_dirty_output", "localhost:9092", new RawFormat(),
+                null, properties, null, new CanalJsonFormat(), 
"topic_dirty_output",
+                null, null);
+    }
+
     /**
      * build node relation
      *
@@ -233,4 +327,60 @@ public class KafkaLoadSqlParseTest extends 
AbstractTestBase {
         ParseResult result = parser.parse();
         Assert.assertTrue(result.tryExecute());
     }
+
+    /**
+     * Test dirty handle of kafka source and kafka sink
+     * In this part it uses 'log' side-output for kafka source and 's3' 
side-output for kafka sink
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testKafkaDirtyHandleSqlParse() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildDirtyKafkaExtractNode();
+        Node outputNode = buildDirtyKafkaLoadNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+
+    /**
+     * Test dirty handle of kafka source and kafka sink with dynamic topic
+     * In this part it uses 'log' side-output for kafka source and 's3' 
side-output for kafka sink
+     * @throws Exception The exception may be thrown when executing
+     */
+    @Test
+    public void testKafkaDirtyHandleWithDynamicTopicSqlParse() throws 
Exception {
+        EnvironmentSettings settings = EnvironmentSettings
+                .newInstance()
+                .useBlinkPlanner()
+                .inStreamingMode()
+                .build();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.enableCheckpointing(10000);
+        env.disableOperatorChaining();
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
+        Node inputNode = buildDirtyKafkaExtractNodeWithRawFormat();
+        Node outputNode = buildDirtyKafkaLoadNodeWithDynamicTopic();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, 
outputNode),
+                
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+                        Collections.singletonList(outputNode))));
+        GroupInfo groupInfo = new GroupInfo("1", 
Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, 
groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
 }

Reply via email to