This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 99947c98d [INLONG-6869][Sort] Supports dirty data side-output for 
elasticsearch sink (#6870)
99947c98d is described below

commit 99947c98d6437a694ccd3ffd1177f88add12dd31
Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Wed Dec 14 15:23:38 2022 +0800

    [INLONG-6869][Sort] Supports dirty data side-output for elasticsearch sink 
(#6870)
---
 .../inlong/sort/base/dirty/DirtySinkHelper.java    | 108 ++++++++++++++++++
 .../apache/inlong/sort/base/dirty/DirtyType.java   |  16 +++
 .../sort/elasticsearch6/ElasticsearchSink.java     |  20 +++-
 .../table/Elasticsearch6DynamicSink.java           |  16 ++-
 .../table/Elasticsearch6DynamicSinkFactory.java    |  14 ++-
 .../sort/elasticsearch7/ElasticsearchSink.java     |  19 +++-
 .../table/Elasticsearch7DynamicSink.java           |  16 ++-
 .../table/Elasticsearch7DynamicSinkFactory.java    |  14 ++-
 .../sort/elasticsearch/ElasticsearchSinkBase.java  | 122 +++++++++++---------
 .../table/RowElasticsearchSinkFunction.java        | 124 ++++++++++++++-------
 .../sort/parser/ElasticsearchSqlParseTest.java     |  10 +-
 11 files changed, 369 insertions(+), 110 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
new file mode 100644
index 000000000..a962b974e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.Serializable;
+
+/**
+ * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
+ * @param <T>
+ */
+public class DirtySinkHelper<T> implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtySinkHelper.class);
+
+    private DirtyOptions dirtyOptions;
+    private final @Nullable DirtySink<T> dirtySink;
+
+    public DirtySinkHelper(DirtyOptions dirtyOptions, @Nullable DirtySink<T> 
dirtySink) {
+        this.dirtyOptions = Preconditions.checkNotNull(dirtyOptions, 
"dirtyOptions is null");
+        this.dirtySink = dirtySink;
+    }
+
+    /**
+     * Open for dirty sink
+     *
+     * @param configuration The configuration that is used for dirty sink
+     */
+    public void open(Configuration configuration) {
+        if (dirtySink != null) {
+            try {
+                dirtySink.open(configuration);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Dirty data sink
+     * @param dirtyData The dirty data
+     * @param dirtyType The dirty type {@link DirtyType}
+     * @param e The cause of dirty data
+     */
+    public void invoke(T dirtyData, DirtyType dirtyType, Throwable e) {
+        if (!dirtyOptions.ignoreDirty()) {
+            RuntimeException ex;
+            if (e instanceof RuntimeException) {
+                ex = (RuntimeException) e;
+            } else {
+                ex = new RuntimeException(e);
+            }
+            throw ex;
+        }
+        if (dirtySink != null) {
+            DirtyData.Builder<T> builder = DirtyData.builder();
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(dirtyOptions.getLabels())
+                        .setLogTag(dirtyOptions.getLogTag())
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(dirtyOptions.getIdentifier());
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOGGER.warn("Dirty sink failed", ex);
+            }
+        }
+    }
+
+    public void setDirtyOptions(DirtyOptions dirtyOptions) {
+        this.dirtyOptions = dirtyOptions;
+    }
+
+    public DirtyOptions getDirtyOptions() {
+        return dirtyOptions;
+    }
+
+    @Nullable
+    public DirtySink<T> getDirtySink() {
+        return dirtySink;
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
index 0637725c3..89789872a 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java
@@ -88,6 +88,22 @@ public enum DirtyType {
      * Extract RowData error
      */
     EXTRACT_ROWDATA_ERROR("ExtractRowDataError"),
+    /**
+     * Index generate error
+     */
+    INDEX_GENERATE_ERROR("IndexGenerateError"),
+    /**
+     * Index id generate error
+     */
+    INDEX_ID_GENERATE_ERROR("IndexIdGenerateError"),
+    /**
+     * Index routing error
+     */
+    INDEX_ROUTING_ERROR("IndexRoutingError"),
+    /**
+     * Document parse error
+     */
+    DOCUMENT_PARSE_ERROR("DocumentParseError"),
     ;
 
     private final String format;
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
index f33fb7fa4..5ada044cb 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/ElasticsearchSink.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandl
 import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.http.HttpHost;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.elasticsearch.action.ActionRequest;
@@ -70,14 +71,15 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
             RestClientFactory restClientFactory,
-            String inlongMetric) {
-
+            String inlongMetric,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         super(
                 new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
-                inlongMetric);
+                inlongMetric,
+                dirtySinkHelper);
     }
 
     /**
@@ -96,6 +98,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
         private RestClientFactory restClientFactory = restClientBuilder -> {
         };
         private String inlongMetric = null;
+        private DirtySinkHelper<Object> dirtySinkHelper;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the 
cluster using a {@link
@@ -120,6 +123,14 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
             this.inlongMetric = inlongMetric;
         }
 
+        /**
+         * Set dirty sink helper
+         * @param dirtySinkHelper The dirty sink helper
+         */
+        public void setDirtySinkHelper(DirtySinkHelper<Object> 
dirtySinkHelper) {
+            this.dirtySinkHelper = dirtySinkHelper;
+        }
+
         /**
          * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
          * disable it.
@@ -244,7 +255,8 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
                     elasticsearchSinkFunction,
                     failureHandler,
                     restClientFactory,
-                    inlongMetric);
+                    inlongMetric,
+                    dirtySinkHelper);
         }
 
         @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
index 0b0a3cf58..61dfdb3cb 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSink.java
@@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
 import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
 import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
@@ -67,6 +68,7 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final ElasticSearchBuilderProvider builderProvider;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
 
     // --------------------------------------------------------------
     // Hack to make configuration testing possible.
@@ -83,8 +85,10 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
             Elasticsearch6Configuration config,
             TableSchema schema,
             String inlongMetric,
-            String auditHostAndPorts) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new), 
inlongMetric, auditHostAndPorts);
+            String auditHostAndPorts,
+            DirtySinkHelper<Object> dirtySinkHelper) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new),
+                inlongMetric, auditHostAndPorts, dirtySinkHelper);
     }
 
     Elasticsearch6DynamicSink(
@@ -93,13 +97,15 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
             TableSchema schema,
             ElasticSearchBuilderProvider builderProvider,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         this.format = format;
         this.schema = schema;
         this.config = config;
         this.builderProvider = builderProvider;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtySinkHelper = dirtySinkHelper;
     }
 
     @Override
@@ -134,7 +140,8 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
                             RoutingExtractor.createRoutingExtractor(
                                     schema, 
config.getRoutingField().orElse(null)),
                             inlongMetric,
-                            auditHostAndPorts);
+                            auditHostAndPorts,
+                            dirtySinkHelper);
 
             final ElasticsearchSink.Builder<RowData> builder =
                     builderProvider.createBuilder(config.getHosts(), 
upsertFunction);
@@ -145,6 +152,7 @@ final class Elasticsearch6DynamicSink implements 
DynamicTableSink {
             builder.setBulkFlushInterval(config.getBulkFlushInterval());
             builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
             builder.setInLongMetric(inlongMetric);
+            builder.setDirtySinkHelper(dirtySinkHelper);
             
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
             
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
             
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
index 01833cd3c..501d3ccd7 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-6/src/main/java/org/apache/inlong/sort/elasticsearch6/table/Elasticsearch6DynamicSinkFactory.java
@@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
 
 import java.util.Set;
@@ -39,6 +43,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
@@ -99,7 +104,7 @@ public class Elasticsearch6DynamicSinkFactory implements 
DynamicTableSinkFactory
         final EncodingFormat<SerializationSchema<RowData>> format =
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
 
-        helper.validate();
+        helper.validateExcept(DIRTY_PREFIX);
         Configuration configuration = new Configuration();
         
context.getCatalogTable().getOptions().forEach(configuration::setString);
         Elasticsearch6Configuration config =
@@ -110,9 +115,12 @@ public class Elasticsearch6DynamicSinkFactory implements 
DynamicTableSinkFactory
         String inlongMetric = 
helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
 
         String auditHostAndPorts = 
helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
-
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(helper.getOptions());
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+        final DirtySinkHelper<Object> dirtySinkHelper = new 
DirtySinkHelper<>(dirtyOptions, dirtySink);
         return new Elasticsearch6DynamicSink(
-                format, config, 
TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, 
auditHostAndPorts);
+                format, config, 
TableSchemaUtils.getPhysicalSchema(tableSchema),
+                inlongMetric, auditHostAndPorts, dirtySinkHelper);
     }
 
     private void validate(Elasticsearch6Configuration config, Configuration 
originalConfiguration) {
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
index 10b7cdfb8..9dd1dd90f 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/ElasticsearchSink.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sort.elasticsearch7;
 
 import org.apache.flink.annotation.PublicEvolving;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkBase;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
@@ -71,14 +72,16 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
             RestClientFactory restClientFactory,
-            String inlongMetric) {
+            String inlongMetric,
+            DirtySinkHelper<Object> dirtySinkHelper) {
 
         super(
                 new Elasticsearch7ApiCallBridge(httpHosts, restClientFactory),
                 bulkRequestsConfig,
                 elasticsearchSinkFunction,
                 failureHandler,
-                inlongMetric);
+                inlongMetric,
+                dirtySinkHelper);
     }
 
     /**
@@ -97,6 +100,7 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
         private RestClientFactory restClientFactory = restClientBuilder -> {
         };
         private String inlongMetric = null;
+        private DirtySinkHelper<Object> dirtySinkHelper;
 
         /**
          * Creates a new {@code ElasticsearchSink} that connects to the 
cluster using a {@link
@@ -121,6 +125,14 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
             this.inlongMetric = inlongMetric;
         }
 
+        /**
+         * Set dirty sink helper
+         * @param dirtySinkHelper The dirty sink helper
+         */
+        public void setDirtySinkHelper(DirtySinkHelper<Object> 
dirtySinkHelper) {
+            this.dirtySinkHelper = dirtySinkHelper;
+        }
+
         /**
          * Sets the maximum number of actions to buffer for each bulk request. 
You can pass -1 to
          * disable it.
@@ -245,7 +257,8 @@ public class ElasticsearchSink<T> extends 
ElasticsearchSinkBase<T, RestHighLevel
                     elasticsearchSinkFunction,
                     failureHandler,
                     restClientFactory,
-                    inlongMetric);
+                    inlongMetric,
+                    dirtySinkHelper);
         }
 
         @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
index 29e2f1795..009283330 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSink.java
@@ -35,6 +35,7 @@ import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
 import org.apache.http.client.CredentialsProvider;
 import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.elasticsearch.table.IndexGeneratorFactory;
 import org.apache.inlong.sort.elasticsearch.table.KeyExtractor;
 import org.apache.inlong.sort.elasticsearch.table.RequestFactory;
@@ -68,6 +69,7 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
     private final String inlongMetric;
     private final String auditHostAndPorts;
     private final ElasticSearchBuilderProvider builderProvider;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
 
     // --------------------------------------------------------------
     // Hack to make configuration testing possible.
@@ -84,8 +86,10 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
             Elasticsearch7Configuration config,
             TableSchema schema,
             String inlongMetric,
-            String auditHostAndPorts) {
-        this(format, config, schema, (ElasticsearchSink.Builder::new), 
inlongMetric, auditHostAndPorts);
+            String auditHostAndPorts,
+            DirtySinkHelper<Object> dirtySinkHelper) {
+        this(format, config, schema, (ElasticsearchSink.Builder::new),
+                inlongMetric, auditHostAndPorts, dirtySinkHelper);
     }
 
     Elasticsearch7DynamicSink(
@@ -94,13 +98,15 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
             TableSchema schema,
             ElasticSearchBuilderProvider builderProvider,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         this.format = format;
         this.schema = schema;
         this.config = config;
         this.builderProvider = builderProvider;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtySinkHelper = dirtySinkHelper;
     }
 
     @Override
@@ -135,7 +141,8 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
                             RoutingExtractor.createRoutingExtractor(
                                     schema, 
config.getRoutingField().orElse(null)),
                             inlongMetric,
-                            auditHostAndPorts);
+                            auditHostAndPorts,
+                            dirtySinkHelper);
 
             final ElasticsearchSink.Builder<RowData> builder =
                     builderProvider.createBuilder(config.getHosts(), 
upsertFunction);
@@ -146,6 +153,7 @@ final class Elasticsearch7DynamicSink implements 
DynamicTableSink {
             builder.setBulkFlushInterval(config.getBulkFlushInterval());
             builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
             builder.setInLongMetric(inlongMetric);
+            builder.setDirtySinkHelper(dirtySinkHelper);
             
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
             
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
             
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
index 1abbf1705..a1bfaa90e 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-7/src/main/java/org/apache/inlong/sort/elasticsearch7/table/Elasticsearch7DynamicSinkFactory.java
@@ -32,6 +32,10 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.SerializationFormatFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.StringUtils;
+import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.sink.DirtySink;
+import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils;
 import org.apache.inlong.sort.elasticsearch.table.ElasticsearchValidationUtils;
 
 import java.util.Set;
@@ -39,6 +43,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_PREFIX;
 import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 import static 
org.apache.inlong.sort.elasticsearch.table.ElasticsearchOptions.BULK_FLASH_MAX_SIZE_OPTION;
@@ -99,7 +104,7 @@ public class Elasticsearch7DynamicSinkFactory implements 
DynamicTableSinkFactory
         final EncodingFormat<SerializationSchema<RowData>> format =
                 
helper.discoverEncodingFormat(SerializationFormatFactory.class, FORMAT_OPTION);
 
-        helper.validate();
+        helper.validateExcept(DIRTY_PREFIX);
         Configuration configuration = new Configuration();
         
context.getCatalogTable().getOptions().forEach(configuration::setString);
         Elasticsearch7Configuration config =
@@ -110,9 +115,12 @@ public class Elasticsearch7DynamicSinkFactory implements 
DynamicTableSinkFactory
         String inlongMetric = 
helper.getOptions().getOptional(INLONG_METRIC).orElse(null);
 
         String auditHostAndPorts = 
helper.getOptions().getOptional(INLONG_AUDIT).orElse(null);
-
+        final DirtyOptions dirtyOptions = 
DirtyOptions.fromConfig(helper.getOptions());
+        final DirtySink<Object> dirtySink = 
DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions);
+        final DirtySinkHelper<Object> dirtySinkHelper = new 
DirtySinkHelper<>(dirtyOptions, dirtySink);
         return new Elasticsearch7DynamicSink(
-                format, config, 
TableSchemaUtils.getPhysicalSchema(tableSchema), inlongMetric, 
auditHostAndPorts);
+                format, config, 
TableSchemaUtils.getPhysicalSchema(tableSchema),
+                inlongMetric, auditHostAndPorts, dirtySinkHelper);
     }
 
     private void validate(Elasticsearch7Configuration config, Configuration 
originalConfiguration) {
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index 50fc2a8f0..c21685de0 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -28,23 +28,32 @@ import 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.IgnoringFailureHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.InstantiationUtil;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.action.ActionRequest;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkProcessor;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.common.unit.ByteSizeUnit;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
@@ -76,6 +85,8 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
 
     public static final String CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS = 
"bulk.flush.max.actions";
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchSinkBase.class);
+
     // ------------------------------------------------------------------------
     // Internal bulk processor configuration
     // ------------------------------------------------------------------------
@@ -125,6 +136,7 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
      */
     private final AtomicReference<Throwable> failureThrowable = new 
AtomicReference<>();
     private final String inlongMetric;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
     /**
      * If true, the producer will wait until all outstanding action requests 
have been sent to
      * Elasticsearch.
@@ -170,8 +182,10 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
             Map<String, String> userConfig,
             ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
             ActionRequestFailureHandler failureHandler,
-            String inlongMetric) {
+            String inlongMetric,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         this.inlongMetric = inlongMetric;
+        this.dirtySinkHelper = dirtySinkHelper;
         this.callBridge = checkNotNull(callBridge);
         this.elasticsearchSinkFunction = 
checkNotNull(elasticsearchSinkFunction);
         this.failureHandler = checkNotNull(failureHandler);
@@ -275,9 +289,9 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
         if (metricOption != null) {
             sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
         }
-
+        dirtySinkHelper.open(parameters);
         callBridge.verifyClientConnection(client);
-        bulkProcessor = buildBulkProcessor(new 
BulkProcessorListener(sinkMetricData));
+        bulkProcessor = buildBulkProcessor(new 
BulkProcessorListener(sinkMetricData, dirtySinkHelper));
         requestIndexer =
                 callBridge.createBulkProcessorIndexer(
                         bulkProcessor, flushOnCheckpoint, numPendingRequests);
@@ -462,10 +476,13 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
 
     private class BulkProcessorListener implements BulkProcessor.Listener {
 
-        private SinkMetricData sinkMetricData;
+        private final SinkMetricData sinkMetricData;
+        private final DirtySinkHelper<Object> dirtySinkHelper;
 
-        public BulkProcessorListener(SinkMetricData sinkMetricData) {
+        public BulkProcessorListener(SinkMetricData sinkMetricData,
+                DirtySinkHelper<Object> dirtySinkHelper) {
             this.sinkMetricData = sinkMetricData;
+            this.dirtySinkHelper = dirtySinkHelper;
         }
 
         @Override
@@ -477,42 +494,16 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
             if (response.hasFailures()) {
                 BulkItemResponse itemResponse;
                 Throwable failure;
-                RestStatus restStatus;
-                DocWriteRequest actionRequest;
-
+                int restStatus;
                 try {
                     for (int i = 0; i < response.getItems().length; i++) {
                         itemResponse = response.getItems()[i];
                         failure = 
callBridge.extractFailureCauseFromBulkItemResponse(itemResponse);
                         if (failure != null) {
-                            restStatus = itemResponse.getFailure().getStatus();
-                            actionRequest = request.requests().get(i);
-                            if (sinkMetricData != null) {
-                                sinkMetricData.invokeDirty(1, 0);
-                            }
-                            if (restStatus == null) {
-                                if (actionRequest instanceof ActionRequest) {
-                                    failureHandler.onFailure(
-                                            (ActionRequest) actionRequest,
-                                            failure,
-                                            -1,
-                                            failureRequestIndexer);
-                                } else {
-                                    throw new UnsupportedOperationException(
-                                            "The sink currently only supports 
ActionRequests");
-                                }
-                            } else {
-                                if (actionRequest instanceof ActionRequest) {
-                                    failureHandler.onFailure(
-                                            (ActionRequest) actionRequest,
-                                            failure,
-                                            restStatus.getStatus(),
-                                            failureRequestIndexer);
-                                } else {
-                                    throw new UnsupportedOperationException(
-                                            "The sink currently only supports 
ActionRequests");
-                                }
-                            }
+                            restStatus = itemResponse.getFailure().getStatus() 
!= null
+                                    ? 
itemResponse.getFailure().getStatus().getStatus()
+                                    : -1;
+                            handleFailure(request.requests().get(i), 
restStatus, failure);
                         }
                     }
                 } catch (Throwable t) {
@@ -521,7 +512,6 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
                     failureThrowable.compareAndSet(null, t);
                 }
             }
-
             if (flushOnCheckpoint) {
                 numPendingRequests.getAndAdd(-request.numberOfActions());
             }
@@ -530,27 +520,59 @@ public abstract class ElasticsearchSinkBase<T, C extends 
AutoCloseable> extends
         @Override
         public void afterBulk(long executionId, BulkRequest request, Throwable 
failure) {
             try {
-                for (DocWriteRequest writeRequest : request.requests()) {
-                    if (sinkMetricData != null) {
-                        sinkMetricData.invokeDirty(1, 0);
-                    }
-                    if (writeRequest instanceof ActionRequest) {
-                        failureHandler.onFailure(
-                                (ActionRequest) writeRequest, failure, -1, 
failureRequestIndexer);
-                    } else {
-                        throw new UnsupportedOperationException(
-                                "The sink currently only supports 
ActionRequests");
-                    }
+                for (DocWriteRequest<?> writeRequest : request.requests()) {
+                    handleFailure(writeRequest, -1, failure);
                 }
             } catch (Throwable t) {
                 // fail the sink and skip the rest of the items
                 // if the failure handler decides to throw an exception
                 failureThrowable.compareAndSet(null, t);
             }
-
             if (flushOnCheckpoint) {
                 numPendingRequests.getAndAdd(-request.numberOfActions());
             }
         }
+
+        private void handleFailure(DocWriteRequest<?> writeRequest, int 
restStatus, Throwable failure)
+                throws Throwable {
+            // Only supports dirty data sink when the failureHandler is 
IgnoringFailureHandler
+            if (failureHandler instanceof IgnoringFailureHandler) {
+                if (!(writeRequest instanceof ActionRequest)) {
+                    LOGGER.error("The sink currently only supports 
ActionRequests");
+                    dirtySinkHelper.invoke(writeRequest.id(), 
DirtyType.UNSUPPORTED_DATA_TYPE,
+                            new UnsupportedOperationException("The sink 
currently only supports ActionRequests"));
+                    if (sinkMetricData != null) {
+                        sinkMetricData.invokeDirty(1, 
writeRequest.id().getBytes(StandardCharsets.UTF_8).length);
+                    }
+                } else if (ExceptionUtils.findThrowable(failure, 
ElasticsearchParseException.class).isPresent()) {
+                    String dirtyData;
+                    if (writeRequest instanceof UpdateRequest) {
+                        dirtyData = ((UpdateRequest) 
writeRequest).doc().source().utf8ToString();
+                    } else if (writeRequest instanceof IndexRequest) {
+                        dirtyData = ((IndexRequest) 
writeRequest).source().utf8ToString();
+                    } else {
+                        dirtyData = writeRequest.id();
+                    }
+                    LOGGER.error(String.format("Elasticsearch parse exception, 
raw data: %s", dirtyData), failure);
+                    dirtySinkHelper.invoke(dirtyData, 
DirtyType.DOCUMENT_PARSE_ERROR, failure);
+                    if (sinkMetricData != null) {
+                        sinkMetricData.invokeDirty(1, 
dirtyData.getBytes(StandardCharsets.UTF_8).length);
+                    }
+                } else {
+                    throw failure;
+                }
+            } else {
+                if (writeRequest instanceof ActionRequest) {
+                    failureHandler.onFailure(
+                            (ActionRequest) writeRequest,
+                            failure,
+                            restStatus,
+                            failureRequestIndexer);
+                } else {
+                    throw new UnsupportedOperationException(
+                            "The sink currently only supports ActionRequests");
+                }
+            }
+        }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index 4ff80bcca..fe605e3c1 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -26,13 +26,14 @@ import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
+import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.util.Preconditions;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
@@ -42,9 +43,12 @@ import org.elasticsearch.action.delete.DeleteRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.update.UpdateRequest;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Objects;
 import java.util.function.Function;
 
@@ -57,6 +61,8 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
 
     private static final long serialVersionUID = 1L;
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RowElasticsearchSinkFunction.class);
+
     private final IndexGenerator indexGenerator;
     private final String docType;
     private final SerializationSchema<RowData> serializationSchema;
@@ -73,6 +79,7 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
     private transient RuntimeContext runtimeContext;
 
     private SinkMetricData sinkMetricData;
+    private final DirtySinkHelper<Object> dirtySinkHelper;
 
     public RowElasticsearchSinkFunction(
             IndexGenerator indexGenerator,
@@ -83,7 +90,8 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
             Function<RowData, String> createKey,
             @Nullable Function<RowData, String> createRouting,
             String inlongMetric,
-            String auditHostAndPorts) {
+            String auditHostAndPorts,
+            DirtySinkHelper<Object> dirtySinkHelper) {
         this.indexGenerator = Preconditions.checkNotNull(indexGenerator);
         this.docType = docType;
         this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
@@ -93,6 +101,7 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
         this.createRouting = createRouting;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtySinkHelper = dirtySinkHelper;
     }
 
     @Override
@@ -146,55 +155,94 @@ public class RowElasticsearchSinkFunction implements 
ElasticsearchSinkFunction<R
 
     @Override
     public void process(RowData element, RuntimeContext ctx, RequestIndexer 
indexer) {
+        final byte[] document;
+        try {
+            document = serializationSchema.serialize(element);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Serialize error, raw data: %s", 
element), e);
+            dirtySinkHelper.invoke(element, DirtyType.SERIALIZE_ERROR, e);
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(1, 
element.toString().getBytes(StandardCharsets.UTF_8).length);
+            }
+            return;
+        }
+        final String key;
+        try {
+            key = createKey.apply(element);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Generate index id error, raw data: 
%s", element), e);
+            dirtySinkHelper.invoke(element, DirtyType.INDEX_ID_GENERATE_ERROR, 
e);
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(1, document.length);
+            }
+            return;
+        }
+        final String index;
+        try {
+            index = indexGenerator.generate(element);
+        } catch (Exception e) {
+            LOGGER.error(String.format("Generate index error, raw data: %s", 
element), e);
+            dirtySinkHelper.invoke(element, DirtyType.INDEX_GENERATE_ERROR, e);
+            if (sinkMetricData != null) {
+                sinkMetricData.invokeDirty(1, document.length);
+            }
+            return;
+        }
+        addDocument(element, key, index, document, indexer);
+    }
+
+    private void addDocument(RowData element, String key, String index, byte[] 
document, RequestIndexer indexer) {
+        DocWriteRequest<?> request;
         switch (element.getRowKind()) {
             case INSERT:
             case UPDATE_AFTER:
-                processUpsert(element, indexer);
+                if (key != null) {
+                    request = requestFactory.createUpdateRequest(index, 
docType, key, contentType, document);
+                    if (addRouting(request, element, document)) {
+                        indexer.add((UpdateRequest) request);
+                        sendMetrics(document);
+                    }
+                } else {
+                    request = requestFactory.createIndexRequest(index, 
docType, key, contentType, document);
+                    if (addRouting(request, element, document)) {
+                        indexer.add((IndexRequest) request);
+                        sendMetrics(document);
+                    }
+                }
                 break;
             case UPDATE_BEFORE:
             case DELETE:
-                processDelete(element, indexer);
+                request = requestFactory.createDeleteRequest(index, docType, 
key);
+                if (addRouting(request, element, document)) {
+                    indexer.add((DeleteRequest) request);
+                    sendMetrics(document);
+                }
                 break;
             default:
-                throw new TableException("Unsupported message kind: " + 
element.getRowKind());
+                LOGGER.error(String.format("The type of element should be 
'RowData' only, raw data: %s", element));
+                dirtySinkHelper.invoke(element, 
DirtyType.UNSUPPORTED_DATA_TYPE,
+                        new RuntimeException("The type of element should be 
'RowData' only."));
+                if (sinkMetricData != null) {
+                    sinkMetricData.invokeDirty(1, document.length);
+                }
         }
     }
 
-    private void processUpsert(RowData row, RequestIndexer indexer) {
-        final byte[] document = serializationSchema.serialize(row);
-        final String key = createKey.apply(row);
-        sendMetrics(document);
-        if (key != null) {
-            final UpdateRequest updateRequest =
-                    requestFactory.createUpdateRequest(
-                            indexGenerator.generate(row), docType, key, 
contentType, document);
-            addRouting(updateRequest, row);
-            indexer.add(updateRequest);
-        } else {
-            final IndexRequest indexRequest =
-                    requestFactory.createIndexRequest(
-                            indexGenerator.generate(row), docType, key, 
contentType, document);
-            addRouting(indexRequest, row);
-            indexer.add(indexRequest);
-        }
-    }
-
-    private void processDelete(RowData row, RequestIndexer indexer) {
-        // the serialization is just for metrics
-        final byte[] document = serializationSchema.serialize(row);
-        sendMetrics(document);
-        final String key = createKey.apply(row);
-        final DeleteRequest deleteRequest =
-                
requestFactory.createDeleteRequest(indexGenerator.generate(row), docType, key);
-        addRouting(deleteRequest, row);
-        indexer.add(deleteRequest);
-    }
-
-    private void addRouting(DocWriteRequest<?> request, RowData row) {
+    private boolean addRouting(DocWriteRequest<?> request, RowData row, byte[] 
document) {
         if (null != createRouting) {
-            String routing = createRouting.apply(row);
-            request.routing(routing);
+            try {
+                String routing = createRouting.apply(row);
+                request.routing(routing);
+            } catch (Exception e) {
+                LOGGER.error(String.format("Routing error, raw data: %s", 
row), e);
+                dirtySinkHelper.invoke(row, DirtyType.INDEX_ROUTING_ERROR, e);
+                if (sinkMetricData != null) {
+                    sinkMetricData.invokeDirty(1, document.length);
+                }
+                return false;
+            }
         }
+        return true;
     }
 
     @Override
diff --git 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
index 9e7d4e74d..bf4ef0505 100644
--- 
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
+++ 
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/ElasticsearchSqlParseTest.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.parser;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -74,8 +75,15 @@ public abstract class ElasticsearchSqlParseTest extends 
AbstractTestBase {
                                 new FieldInfo("name", new 
StringFormatInfo())));
         CsvFormat csvFormat = new CsvFormat();
         csvFormat.setDisableQuoteCharacter(true);
+        Map<String, String> properties = new LinkedHashMap<>();
+        properties.put("dirty.side-output.connector", "log");
+        properties.put("dirty.ignore", "true");
+        properties.put("dirty.side-output.enable", "true");
+        properties.put("dirty.side-output.format", "csv");
+        properties.put("dirty.side-output.labels",
+                
"SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&table=test");
         return new ElasticsearchLoadNode("2", "kafka_output", fields, 
relations, null, null,
-                2, null,
+                2, properties,
                 "test", "http://localhost:9200";,
                 "elastic", "my_password", null, "age", version);
     }

Reply via email to