This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 08db5ea09 [INLONG-6765][Sort] Supports dirty data side-output for 
Iceberg sink (#6766)
08db5ea09 is described below

commit 08db5ea092f825ab44dace4534a9c67cc748800c
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Thu Dec 8 09:54:36 2022 +0800

    [INLONG-6765][Sort] Supports dirty data side-output for Iceberg sink (#6766)
---
 .../org/apache/inlong/sort/base/Constants.java     |  11 ++-
 .../apache/inlong/sort/base/dirty/DirtyData.java   |  41 +++++++-
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  12 +++
 .../sort/base/dirty/sink/log/LogDirtySink.java     |  27 +++---
 .../sort/base/dirty/sink/s3/S3DirtySink.java       |  33 ++++---
 .../inlong/sort/base/dirty/utils/FormatUtils.java  |  30 ++++++
 .../inlong/sort/base/sink/MultipleSinkOption.java  |   1 -
 .../inlong/sort/base/dirty/FormatUtilsTest.java    |   4 +-
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  13 ++-
 .../inlong/sort/iceberg/IcebergTableSink.java      |  16 +++-
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  35 +++++--
 .../sink/multiple/DynamicSchemaHandleOperator.java | 106 ++++++++++++++++-----
 .../sink/multiple/IcebergMultipleStreamWriter.java |  19 +++-
 .../sink/multiple/IcebergSingleStreamWriter.java   |  60 ++++++++++--
 .../sort/parser/IcebergNodeSqlParserTest.java      |  11 ++-
 15 files changed, 328 insertions(+), 91 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 983b3c207..23676ec0f 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -227,11 +227,11 @@ public final class Constants {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The identifier of dirty data, "
-                                    + "it will be used for filename generation 
of file dirty sink, "
+                            "The identifier of dirty data, it will be used for 
filename generation of file dirty sink, "
                                     + "topic generation of mq dirty sink, 
tablename generation of database, etc."
                                     + "and it supports variable replace like 
'${variable}'."
-                                    + "There are two system 
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + "There are several system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+                                    + "are currently supported, "
                                     + "and the support of other variables is 
determined by the connector.");
     public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE =
             ConfigOptions.key("dirty.side-output.enable")
@@ -266,7 +266,8 @@ public final class Constants {
                     .withDescription(
                             "The labels of dirty side-output, format is 
'key1=value1&key2=value2', "
                                     + "it supports variable replace like 
'${variable}',"
-                                    + "There are two system 
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + "There are two system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] "
+                                    + "are currently supported,"
                                     + " and the support of other variables is 
determined by the connector.");
     public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG =
             ConfigOptions.key("dirty.side-output.log-tag")
@@ -274,7 +275,7 @@ public final class Constants {
                     .defaultValue("DirtyData")
                     .withDescription(
                             "The log tag of dirty side-output, it supports 
variable replace like '${variable}'."
-                                    + "There are two system 
variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported,"
+                                    + "There are two system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,"
                                     + " and the support of other variables is 
determined by the connector.");
     public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER 
=
             ConfigOptions.key("dirty.side-output.field-delimiter")
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
index a8b84f2b4..16f1cb762 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.sort.base.dirty;
 
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.inlong.sort.base.util.PatternReplaceUtils;
 
+import javax.annotation.Nullable;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.HashMap;
@@ -33,6 +35,8 @@ public class DirtyData<T> {
 
     private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
 
+    private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+
     private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
 
     private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@@ -41,7 +45,7 @@ public class DirtyData<T> {
      * The identifier of dirty data, it will be used for filename generation 
of file dirty sink,
      * topic generation of mq dirty sink, tablename generation of database, 
etc,
      * and it supports variable replace like '${variable}'.
-     * There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently 
supported,
+     * There are several system 
variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported,
      * and the support of other variables is determined by the connector.
      */
     private final String identifier;
@@ -58,18 +62,31 @@ public class DirtyData<T> {
      * Dirty type
      */
     private final DirtyType dirtyType;
+    /**
+     * Dirty describe message, it is the cause of dirty data
+     */
+    private final String dirtyMessage;
+    /**
+     * The row type of data, it is only used for 'RowData'
+     */
+    private @Nullable final LogicalType rowType;
     /**
      * The real dirty data
      */
     private final T data;
 
-    public DirtyData(T data, String identifier, String labels, String logTag, 
DirtyType dirtyType) {
+    public DirtyData(T data, String identifier, String labels,
+            String logTag, DirtyType dirtyType, String dirtyMessage,
+            @Nullable LogicalType rowType) {
         this.data = data;
         this.dirtyType = dirtyType;
+        this.dirtyMessage = dirtyMessage;
+        this.rowType = rowType;
         Map<String, String> paramMap = genParamMap();
         this.labels = PatternReplaceUtils.replace(labels, paramMap);
         this.logTag = PatternReplaceUtils.replace(logTag, paramMap);
         this.identifier = PatternReplaceUtils.replace(identifier, paramMap);
+
     }
 
     public static <T> Builder<T> builder() {
@@ -80,6 +97,7 @@ public class DirtyData<T> {
         Map<String, String> paramMap = new HashMap<>();
         paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
         paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
         return paramMap;
     }
 
@@ -103,12 +121,19 @@ public class DirtyData<T> {
         return identifier;
     }
 
+    @Nullable
+    public LogicalType getRowType() {
+        return rowType;
+    }
+
     public static class Builder<T> {
 
         private String identifier;
         private String labels;
         private String logTag;
         private DirtyType dirtyType = DirtyType.UNDEFINED;
+        private String dirtyMessage;
+        private LogicalType rowType;
         private T data;
 
         public Builder<T> setDirtyType(DirtyType dirtyType) {
@@ -136,8 +161,18 @@ public class DirtyData<T> {
             return this;
         }
 
+        public Builder<T> setDirtyMessage(String dirtyMessage) {
+            this.dirtyMessage = dirtyMessage;
+            return this;
+        }
+
+        public Builder<T> setRowType(LogicalType rowType) {
+            this.rowType = rowType;
+            return this;
+        }
+
         public DirtyData<T> build() {
-            return new DirtyData<>(data, identifier, labels, logTag, 
dirtyType);
+            return new DirtyData<>(data, identifier, labels, logTag, 
dirtyType, dirtyMessage, rowType);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index afe3e0d17..0637725c3 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -76,6 +76,18 @@ public enum DirtyType {
      * Json process error
      */
     JSON_PROCESS_ERROR("JsonProcessError"),
+    /**
+     * Table identifier parse error
+     */
+    TABLE_IDENTIFIER_PARSE_ERROR("TableIdentifierParseError"),
+    /**
+     * Extract schema error
+     */
+    EXTRACT_SCHEMA_ERROR("ExtractSchemaError"),
+    /**
+     * Extract RowData error
+     */
+    EXTRACT_ROWDATA_ERROR("ExtractRowDataError"),
     ;
 
     private final String format;
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
index a57c981fe..b7f7af914 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * Log dirty sink that is used to print log
@@ -48,7 +47,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(LogDirtySink.class);
 
-    private final RowData.FieldGetter[] fieldGetters;
+    private RowData.FieldGetter[] fieldGetters;
     private final String format;
     private final String fieldDelimiter;
     private final DataType physicalRowDataType;
@@ -58,18 +57,13 @@ public class LogDirtySink<T> implements DirtySink<T> {
         this.format = format;
         this.fieldDelimiter = fieldDelimiter;
         this.physicalRowDataType = physicalRowDataType;
-        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
-                
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
-        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
-        for (int i = 0; i < logicalTypes.length; i++) {
-            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
-        }
     }
 
     @Override
     public void open(Configuration configuration) throws Exception {
         converter = new RowDataToJsonConverters(TimestampFormat.SQL, 
MapNullKeyMode.DROP, null)
                 .createConverter(physicalRowDataType.getLogicalType());
+        fieldGetters = 
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
     }
 
     @Override
@@ -78,7 +72,7 @@ public class LogDirtySink<T> implements DirtySink<T> {
         Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
         T data = dirtyData.getData();
         if (data instanceof RowData) {
-            value = format((RowData) data, labelMap);
+            value = format((RowData) data, dirtyData.getRowType(), labelMap);
         } else if (data instanceof JsonNode) {
             value = format((JsonNode) data, labelMap);
         } else {
@@ -88,14 +82,23 @@ public class LogDirtySink<T> implements DirtySink<T> {
         LOGGER.info("[{}] {}", dirtyData.getLogTag(), value);
     }
 
-    private String format(RowData data, Map<String, String> labels) throws 
JsonProcessingException {
+    private String format(RowData data, LogicalType rowType,
+            Map<String, String> labels) throws JsonProcessingException {
         String value;
         switch (format) {
             case "csv":
-                value = FormatUtils.csvFormat(data, fieldGetters, labels, 
fieldDelimiter);
+                RowData.FieldGetter[] getters = fieldGetters;
+                if (rowType != null) {
+                    getters = FormatUtils.parseFieldGetters(rowType);
+                }
+                value = FormatUtils.csvFormat(data, getters, labels, 
fieldDelimiter);
                 break;
             case "json":
-                value = FormatUtils.jsonFormat(data, converter, labels);
+                RowDataToJsonConverter jsonConverter = converter;
+                if (rowType != null) {
+                    jsonConverter = 
FormatUtils.parseRowDataToJsonConverter(rowType);
+                }
+                value = FormatUtils.jsonFormat(data, jsonConverter, labels);
                 break;
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index 3cb20c7b8..ab8fc9464 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -24,9 +24,6 @@ import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.formats.common.TimestampFormat;
-import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
-import org.apache.flink.formats.json.RowDataToJsonConverters;
 import 
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
@@ -53,7 +50,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * S3 dirty sink that is used to sink dirty data to s3
@@ -72,7 +68,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
     private final AtomicLong writeOutNum = new AtomicLong(0);
     private final AtomicLong errorNum = new AtomicLong(0);
     private final DataType physicalRowDataType;
-    private final RowData.FieldGetter[] fieldGetters;
+    private RowData.FieldGetter[] fieldGetters;
     private RowDataToJsonConverter converter;
     private long batchBytes = 0L;
     private int size;
@@ -85,18 +81,12 @@ public class S3DirtySink<T> implements DirtySink<T> {
     public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) {
         this.s3Options = s3Options;
         this.physicalRowDataType = physicalRowDataType;
-        final LogicalType[] logicalTypes = physicalRowDataType.getChildren()
-                
.stream().map(DataType::getLogicalType).toArray(LogicalType[]::new);
-        this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
-        for (int i = 0; i < logicalTypes.length; i++) {
-            fieldGetters[i] = createFieldGetter(logicalTypes[i], i);
-        }
     }
 
     @Override
     public void open(Configuration configuration) throws Exception {
-        converter = new RowDataToJsonConverters(TimestampFormat.SQL, 
MapNullKeyMode.DROP, null)
-                .createConverter(physicalRowDataType.getLogicalType());
+        converter = 
FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType());
+        fieldGetters = 
FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType());
         AmazonS3 s3Client;
         if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() 
!= null) {
             BasicAWSCredentials awsCreds =
@@ -149,7 +139,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
         Map<String, String> labelMap = 
LabelUtils.parseLabels(dirtyData.getLabels());
         T data = dirtyData.getData();
         if (data instanceof RowData) {
-            value = format((RowData) data, labelMap);
+            value = format((RowData) data, dirtyData.getRowType(), labelMap);
         } else if (data instanceof JsonNode) {
             value = format((JsonNode) data, labelMap);
         } else {
@@ -164,14 +154,23 @@ public class S3DirtySink<T> implements DirtySink<T> {
         batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new 
ArrayList<>()).add(value);
     }
 
-    private String format(RowData data, Map<String, String> labels) throws 
JsonProcessingException {
+    private String format(RowData data, LogicalType rowType,
+            Map<String, String> labels) throws JsonProcessingException {
         String value;
         switch (s3Options.getFormat()) {
             case "csv":
-                value = FormatUtils.csvFormat(data, fieldGetters, labels, 
s3Options.getFieldDelimiter());
+                RowData.FieldGetter[] getters = fieldGetters;
+                if (rowType != null) {
+                    getters = FormatUtils.parseFieldGetters(rowType);
+                }
+                value = FormatUtils.csvFormat(data, getters, labels, 
s3Options.getFieldDelimiter());
                 break;
             case "json":
-                value = FormatUtils.jsonFormat(data, converter, labels);
+                RowDataToJsonConverter jsonConverter = converter;
+                if (rowType != null) {
+                    jsonConverter = 
FormatUtils.parseRowDataToJsonConverter(rowType);
+                }
+                value = FormatUtils.jsonFormat(data, jsonConverter, labels);
                 break;
             default:
                 throw new UnsupportedOperationException(
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
index 3220837dd..58260bae1 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java
@@ -17,18 +17,24 @@
 
 package org.apache.inlong.sort.base.dirty.utils;
 
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode;
+import org.apache.flink.formats.json.RowDataToJsonConverters;
 import 
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.LogicalType;
 
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.StringJoiner;
+import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
  * Format utils
@@ -49,6 +55,30 @@ public final class FormatUtils {
     private FormatUtils() {
     }
 
+    /**
+     * Parse FieldGetter from LogicalType
+     * @param rowType The row type
+     * @return A array of FieldGetter
+     */
+    public static RowData.FieldGetter[] parseFieldGetters(LogicalType rowType) 
{
+        List<LogicalType> logicalTypes = rowType.getChildren();
+        RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[logicalTypes.size()];
+        for (int i = 0; i < logicalTypes.size(); i++) {
+            fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
+        }
+        return fieldGetters;
+    }
+
+    /**
+     * Parse RowDataToJsonConverter
+     * @param rowType The row type
+     * @return RowDataToJsonConverter
+     */
+    public static RowDataToJsonConverter 
parseRowDataToJsonConverter(LogicalType rowType) {
+        return new RowDataToJsonConverters(TimestampFormat.SQL, 
MapNullKeyMode.DROP, null)
+                .createConverter(rowType);
+    }
+
     /**
      * Csv format for 'RowData'
      *
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
index 4e3cfdf9b..d37d5dd28 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java
@@ -149,7 +149,6 @@ public class MultipleSinkOption implements Serializable {
             LOG.warn("Ignore table {} schema change: {}.", tableName, 
tableChange);
             return false;
         }
-
         throw new UnsupportedOperationException(
                 String.format("Unsupported table %s schema change: %s.", 
tableName, tableChange));
     }
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
index 386766ddd..80363ba8b 100644
--- 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.RowData.FieldGetter;
 import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.inlong.sort.base.dirty.utils.FormatUtils;
 import org.junit.Assert;
@@ -41,7 +40,6 @@ import org.junit.Test;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
 /**
@@ -64,7 +62,7 @@ public class FormatUtilsTest {
                 Column.physical("name", DataTypes.STRING()),
                 Column.physical("age", DataTypes.INT()));
         List<LogicalType> logicalTypes = schema.toPhysicalRowDataType()
-                
.getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList());
+                .getLogicalType().getChildren();
         fieldGetters = new RowData.FieldGetter[logicalTypes.size()];
         for (int i = 0; i < logicalTypes.size(); i++) {
             fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index 11ed1113f..8553ce94a 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -45,6 +45,9 @@ import org.apache.iceberg.flink.IcebergTableSource;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 
 import java.util.Map;
 import java.util.Set;
@@ -224,12 +227,15 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         Map<String, String> tableProps = catalogTable.getOptions();
         TableSchema tableSchema = 
TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema());
         ActionsProvider actionsLoader = 
createActionLoader(context.getClassLoader(), tableProps);
-
+        // Build the dirty data side-output
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(Configuration.fromMap(tableProps));
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
         boolean multipleSink = Boolean.parseBoolean(
                 tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(), 
SINK_MULTIPLE_ENABLE.defaultValue().toString()));
         if (multipleSink) {
             CatalogLoader catalogLoader = createCatalogLoader(tableProps);
-            return new IcebergTableSink(null, tableSchema, catalogTable, 
catalogLoader, actionsLoader);
+            return new IcebergTableSink(null, tableSchema, catalogTable,
+                    catalogLoader, actionsLoader, dirtyOptions, dirtySink);
         } else {
             TableLoader tableLoader;
             if (catalog != null) {
@@ -238,7 +244,8 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
                 tableLoader = createTableLoader(catalogTable, tableProps, 
objectPath.getDatabaseName(),
                         objectPath.getObjectName());
             }
-            return new IcebergTableSink(tableLoader, tableSchema, 
catalogTable, null, actionsLoader);
+            return new IcebergTableSink(tableLoader, tableSchema, catalogTable,
+                    null, actionsLoader, dirtyOptions, dirtySink);
         }
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index bd65c76e9..31698c0de 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -35,11 +35,14 @@ import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import org.apache.inlong.sort.iceberg.sink.FlinkSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.util.List;
 import java.util.Map;
 
@@ -74,6 +77,9 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
 
     private boolean overwrite = false;
 
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
+
     private IcebergTableSink(IcebergTableSink toCopy) {
         this.tableLoader = toCopy.tableLoader;
         this.tableSchema = toCopy.tableSchema;
@@ -81,18 +87,24 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
         this.catalogTable = toCopy.catalogTable;
         this.catalogLoader = toCopy.catalogLoader;
         this.actionsProvider = toCopy.actionsProvider;
+        this.dirtyOptions = toCopy.dirtyOptions;
+        this.dirtySink = toCopy.dirtySink;
     }
 
     public IcebergTableSink(TableLoader tableLoader,
             TableSchema tableSchema,
             CatalogTable catalogTable,
             CatalogLoader catalogLoader,
-            ActionsProvider actionsProvider) {
+            ActionsProvider actionsProvider,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.tableLoader = tableLoader;
         this.tableSchema = tableSchema;
         this.catalogTable = catalogTable;
         this.catalogLoader = catalogLoader;
         this.actionsProvider = actionsProvider;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -130,6 +142,8 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                     .overwrite(overwrite)
                     .appendMode(tableOptions.get(IGNORE_ALL_CHANGELOG))
                     .metric(tableOptions.get(INLONG_METRIC), 
tableOptions.get(INLONG_AUDIT))
+                    .dirtyOptions(dirtyOptions)
+                    .dirtySink(dirtySink)
                     .action(actionsProvider)
                     .append();
         }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index dd1c43275..a32db160a 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -52,6 +52,8 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
 import 
org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleFilesCommiter;
 import 
org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleStreamWriter;
@@ -64,6 +66,7 @@ import 
org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.List;
@@ -166,6 +169,8 @@ public class FlinkSink {
         private CatalogLoader catalogLoader = null;
         private boolean multipleSink = false;
         private MultipleSinkOption multipleSinkOption = null;
+        private DirtyOptions dirtyOptions;
+        private @Nullable DirtySink<Object> dirtySink;
 
         private Builder() {
         }
@@ -283,6 +288,16 @@ public class FlinkSink {
             return this;
         }
 
+        public Builder dirtyOptions(DirtyOptions dirtyOptions) {
+            this.dirtyOptions = dirtyOptions;
+            return this;
+        }
+
+        public Builder dirtySink(DirtySink<Object> dirtySink) {
+            this.dirtySink = dirtySink;
+            return this;
+        }
+
         /**
          * Configure the write {@link DistributionMode} that the flink sink 
will use. Currently, flink support
          * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}.
@@ -514,7 +529,8 @@ public class FlinkSink {
             }
 
             IcebergProcessOperator<RowData, WriteResult> streamWriter = 
createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, upsertMode, 
appendMode, inlongMetric, auditHostAndPorts);
+                    table, flinkRowType, equalityFieldIds, upsertMode, 
appendMode, inlongMetric,
+                    auditHostAndPorts, dirtyOptions, dirtySink);
 
             int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -534,8 +550,7 @@ public class FlinkSink {
 
             int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
             DynamicSchemaHandleOperator routeOperator = new 
DynamicSchemaHandleOperator(
-                    catalogLoader,
-                    multipleSinkOption);
+                    catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink);
             SingleOutputStreamOperator<RecordWithSchema> routeStream = input
                     
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
                             TypeInformation.of(RecordWithSchema.class),
@@ -544,7 +559,8 @@ public class FlinkSink {
 
             IcebergProcessOperator streamWriter =
                     new IcebergProcessOperator(new IcebergMultipleStreamWriter(
-                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts, multipleSinkOption));
+                            appendMode, catalogLoader, inlongMetric, 
auditHostAndPorts,
+                            multipleSinkOption, dirtyOptions, dirtySink));
             SingleOutputStreamOperator<MultipleWriteResult> writerStream = 
routeStream
                     
.transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME),
                             TypeInformation.of(IcebergProcessOperator.class),
@@ -618,7 +634,10 @@ public class FlinkSink {
             boolean upsert,
             boolean appendMode,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
+        // flink A, iceberg a
         Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
         Map<String, String> props = table.properties();
         long targetFileSize = getTargetFileSizeBytes(props);
@@ -628,9 +647,11 @@ public class FlinkSink {
         TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(
                 serializableTable, serializableTable.schema(), flinkRowType, 
targetFileSize,
                 fileFormat, equalityFieldIds, upsert, appendMode);
-
+        // Set null for flinkRowType of IcebergSingleStreamWriter
+        // to avoid frequent Field.Getter creation in dirty data sink.
         return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>(
-                table.name(), taskWriterFactory, inlongMetric, 
auditHostAndPorts));
+                table.name(), taskWriterFactory, inlongMetric, 
auditHostAndPorts,
+                null, dirtyOptions, dirtySink));
     }
 
     private static FileFormat getFileFormat(Map<String, String> properties) {
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index f8422add6..571221257 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -19,7 +19,6 @@
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -39,6 +38,10 @@ import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types.NestedField;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtyType;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
@@ -47,6 +50,7 @@ import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
@@ -63,10 +67,10 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             OneInputStreamOperator<RowData, RecordWithSchema>,
             ProcessingTimeCallback {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DynamicSchemaHandleOperator.class);
     private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000;
+    private static final long serialVersionUID = 1L;
 
-    private final ObjectMapper objectMapper = new ObjectMapper();
     private final CatalogLoader catalogLoader;
     private final MultipleSinkOption multipleSinkOption;
 
@@ -84,18 +88,26 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     // blacklist to filter schema update failed table
     private transient Set<TableIdentifier> blacklist;
 
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
+
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
-            MultipleSinkOption multipleSinkOption) {
+            MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public void open() throws Exception {
         super.open();
         this.catalog = catalogLoader.loadCatalog();
         this.asNamespaceCatalog =
                 catalog instanceof SupportsNamespaces ? (SupportsNamespaces) 
catalog : null;
+
         this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat(
                 multipleSinkOption.getFormat(), 
multipleSinkOption.getFormatOption());
 
@@ -118,13 +130,25 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     @Override
     public void processElement(StreamRecord<RowData> element) throws Exception 
{
-        JsonNode jsonNode = 
dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
-
-        TableIdentifier tableId = parseId(jsonNode);
+        JsonNode jsonNode = null;
+        try {
+            jsonNode = 
dynamicSchemaFormat.deserialize(element.getValue().getBinary(0));
+        } catch (Exception e) {
+            LOGGER.error(String.format("Deserialize error, raw data: %s",
+                    new String(element.getValue().getBinary(0))), e);
+            handleDirtyData(new String(element.getValue().getBinary(0)),
+                    null, DirtyType.DESERIALIZE_ERROR, e);
+        }
+        TableIdentifier tableId = null;
+        try {
+            tableId = parseId(jsonNode);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Table identifier parse error, raw 
data: %s", jsonNode), e);
+            handleDirtyData(jsonNode, jsonNode, 
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e);
+        }
         if (blacklist.contains(tableId)) {
             return;
         }
-
         boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
         if (isDDL) {
             execDDL(jsonNode, tableId);
@@ -133,6 +157,41 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         }
     }
 
+    private void handleDirtyData(Object dirtyData, JsonNode rootNode, 
DirtyType dirtyType, Exception e) {
+        if (!dirtyOptions.ignoreDirty()) {
+            RuntimeException ex;
+            if (e instanceof RuntimeException) {
+                ex = (RuntimeException) e;
+            } else {
+                ex = new RuntimeException(e);
+            }
+            throw ex;
+        }
+        if (dirtySink != null) {
+            DirtyData.Builder<Object> builder = DirtyData.builder();
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setDirtyMessage(e.getMessage());
+                if (rootNode != null) {
+                    builder.setLabels(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
+                            .setLogTag(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
+                            .setIdentifier(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
+                } else {
+                    builder.setLabels(dirtyOptions.getLabels())
+                            .setLogTag(dirtyOptions.getLogTag())
+                            .setIdentifier(dirtyOptions.getIdentifier());
+                }
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOG.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
     @Override
     public void onProcessingTime(long timestamp) {
         LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
@@ -146,6 +205,9 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
     private void execDML(JsonNode jsonNode, TableIdentifier tableId) {
         RecordWithSchema record = parseRecord(jsonNode, tableId);
+        if (record == null) {
+            return;
+        }
         Schema schema = schemaCache.get(record.getTableId());
         Schema dataSchema = record.getSchema();
         recordQueues.compute(record.getTableId(), (k, v) -> {
@@ -155,7 +217,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             v.add(record);
             return v;
         });
-
         if (schema == null) {
             handleTableCreateEventFromOperator(record.getTableId(), 
dataSchema);
         } else {
@@ -182,6 +243,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                                 LOG.warn("Ignore table {} schema change, old: 
{} new: {}.",
                                         tableId, dataSchema, latestSchema, e);
                                 blacklist.add(tableId);
+                                handleDirtyData(jsonNode, jsonNode, 
DirtyType.EXTRACT_ROWDATA_ERROR, e);
                             }
                             return Collections.emptyList();
                         });
@@ -204,13 +266,11 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                     LOG.warn("Database({}) already exist in Catalog({})!", 
tableId.namespace(), catalog.name());
                 }
             }
-
             ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
             properties.put("format-version", "2");
             properties.put("write.upsert.enabled", "true");
             // for hive visible
             properties.put("engine.hive.enabled", "true");
-
             try {
                 catalog.createTable(tableId, schema, 
PartitionSpec.unpartitioned(), properties.build());
                 LOG.info("Auto create Table({}) in Database({}) in 
Catalog({})!",
@@ -220,13 +280,11 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                         tableId.name(), tableId.namespace(), catalog.name());
             }
         }
-
         handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema());
     }
 
     private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, 
Schema oldSchema, Schema newSchema) {
         Table table = catalog.loadTable(tableId);
-
         // The transactionality of changes is guaranteed by comparing the old 
schema with the current schema of the
         // table.
         // Judging whether changes can be made by schema comparison (currently 
only column additions are supported),
@@ -263,15 +321,18 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     }
 
     private RecordWithSchema parseRecord(JsonNode data, TableIdentifier 
tableId) {
-        List<String> pkListStr = 
dynamicSchemaFormat.extractPrimaryKeyNames(data);
-        RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr);
-
-        RecordWithSchema record = new RecordWithSchema(
-                data,
-                FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
-                tableId,
-                pkListStr);
-        return record;
+        try {
+            List<String> pkListStr = 
dynamicSchemaFormat.extractPrimaryKeyNames(data);
+            RowType schema = dynamicSchemaFormat.extractSchema(data, 
pkListStr);
+            return new RecordWithSchema(
+                    data,
+                    FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)),
+                    tableId,
+                    pkListStr);
+        } catch (Exception e) {
+            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e);
+        }
+        return null;
     }
 
     private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, 
List<TableChange> tableChanges) {
@@ -290,7 +351,6 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                 break;
             }
         }
-
         return canHandle;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
index 16bedfd83..9246ffeed 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -38,6 +39,8 @@ import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.PropertyUtil;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
@@ -95,18 +98,24 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
     private transient SinkMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     public IcebergMultipleStreamWriter(
             boolean appendMode,
             CatalogLoader catalogLoader,
             String inlongMetric,
             String auditHostAndPorts,
-            MultipleSinkOption multipleSinkOption) {
+            MultipleSinkOption multipleSinkOption,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.appendMode = appendMode;
         this.catalogLoader = catalogLoader;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
         this.multipleSinkOption = multipleSinkOption;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -182,11 +191,11 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
                         .map(NestedField::fieldId)
                         .collect(Collectors.toList());
             }
-
+            RowType flinkRowType = 
FlinkSchemaUtil.convert(recordWithSchema.getSchema());
             TaskWriterFactory<RowData> taskWriterFactory = new 
RowDataTaskWriterFactory(
                     table,
                     recordWithSchema.getSchema(),
-                    FlinkSchemaUtil.convert(recordWithSchema.getSchema()),
+                    flinkRowType,
                     targetFileSizeBytes,
                     fileFormat,
                     equalityFieldIds,
@@ -195,7 +204,8 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
 
             if (multipleWriters.get(tableId) == null) {
                 IcebergSingleStreamWriter<RowData> writer = new 
IcebergSingleStreamWriter<>(
-                        tableId.toString(), taskWriterFactory, null, null);
+                        tableId.toString(), taskWriterFactory, null,
+                        null, flinkRowType, dirtyOptions, dirtySink);
                 writer.setup(getRuntimeContext(),
                         new CallbackCollector<>(
                                 writeResult -> collector.collect(new 
MultipleWriteResult(tableId, writeResult))),
@@ -206,6 +216,7 @@ public class IcebergMultipleStreamWriter extends 
IcebergProcessFunction<RecordWi
             } else { // only if second times schema will evolute
                 // Refresh new schema maybe cause previous file writer 
interrupted, so here should handle it
                 
multipleWriters.get(tableId).schemaEvolution(taskWriterFactory);
+                multipleWriters.get(tableId).setFlinkRowType(flinkRowType);
             }
 
         }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index 8d936ffd8..761306a6e 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -26,15 +26,21 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.flink.sink.TaskWriterFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
+import org.apache.inlong.sort.base.dirty.DirtyData;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -48,6 +54,8 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
             CheckpointedFunction,
             SchemaEvolutionFunction<TaskWriterFactory<T>> {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(IcebergSingleStreamWriter.class);
+
     private static final long serialVersionUID = 1L;
 
     private final String fullTableName;
@@ -58,20 +66,28 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
     private transient TaskWriter<T> writer;
     private transient int subTaskId;
     private transient int attemptId;
-    @Nullable
-    private transient SinkMetricData metricData;
+    private @Nullable transient SinkMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
+    private @Nullable RowType flinkRowType;
+    private final DirtyOptions dirtyOptions;
+    private @Nullable final DirtySink<Object> dirtySink;
 
     public IcebergSingleStreamWriter(
             String fullTableName,
             TaskWriterFactory<T> taskWriterFactory,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            @Nullable RowType flinkRowType,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink) {
         this.fullTableName = fullTableName;
         this.taskWriterFactory = taskWriterFactory;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.flinkRowType = flinkRowType;
+        this.dirtyOptions = dirtyOptions;
+        this.dirtySink = dirtySink;
     }
 
     @Override
@@ -81,7 +97,6 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
 
         // Initialize the task writer factory.
         this.taskWriterFactory.initialize(subTaskId, attemptId);
-
         // Initialize the task writer.
         this.writer = taskWriterFactory.create();
 
@@ -102,17 +117,38 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
     public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
         // close all open files and emit files to downstream committer operator
         emit(writer.complete());
-
         this.writer = taskWriterFactory.create();
     }
 
     @Override
-    public void processElement(T value)
-            throws Exception {
-        writer.write(value);
-
+    public void processElement(T value) throws Exception {
+        try {
+            writer.write(value);
+        } catch (Exception e) {
+            LOGGER.error(String.format("write error, raw data: %s", value), e);
+            if (!dirtyOptions.ignoreDirty()) {
+                throw e;
+            }
+            if (dirtySink != null) {
+                DirtyData.Builder<Object> builder = DirtyData.builder();
+                try {
+                    builder.setData(value)
+                            .setLabels(dirtyOptions.getLabels())
+                            .setLogTag(dirtyOptions.getLogTag())
+                            .setIdentifier(dirtyOptions.getIdentifier())
+                            .setRowType(flinkRowType)
+                            .setDirtyMessage(e.getMessage());
+                    dirtySink.invoke(builder.build());
+                } catch (Exception ex) {
+                    if (!dirtyOptions.ignoreSideOutputErrors()) {
+                        throw new RuntimeException(ex);
+                    }
+                    LOGGER.warn("Dirty sink failed", ex);
+                }
+            }
+        }
         if (metricData != null) {
-            metricData.invokeWithEstimate(value);
+            metricData.invokeWithEstimate(value == null ? "" : value);
         }
     }
 
@@ -131,6 +167,10 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
         }
     }
 
+    public void setFlinkRowType(@Nullable RowType flinkRowType) {
+        this.flinkRowType = flinkRowType;
+    }
+
     @Override
     public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
         if (metricData != null && metricStateListState != null) {
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
index 50388dcb4..0602868f2 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java
@@ -44,6 +44,7 @@ import org.junit.Test;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -116,7 +117,13 @@ public class IcebergNodeSqlParserTest extends 
AbstractTestBase {
                                 new FieldInfo("age", new IntFormatInfo())),
                         new FieldRelation(new FieldInfo("ts", new 
TimestampFormatInfo()),
                                 new FieldInfo("ts", new 
TimestampFormatInfo())));
-
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_iceberg");
         // set HIVE_CONF_DIR,or set uri and warehouse
         return new IcebergLoadNode(
                 "iceberg",
@@ -126,7 +133,7 @@ public class IcebergNodeSqlParserTest extends 
AbstractTestBase {
                 null,
                 null,
                 null,
-                null,
+                properties,
                 "inlong",
                 "inlong_iceberg",
                 null,

Reply via email to