This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c6908e2b [INLONG-7292][Sort] Accurately detect and archive dirty data 
for Doris/StarRocks/JDBC (#7289)
9c6908e2b is described below

commit 9c6908e2b236e35f9b102499243f6f9a191d466a
Author: Yizhou Yang <32808678+yizhou-y...@users.noreply.github.com>
AuthorDate: Fri Feb 3 10:30:53 2023 +0800

    [INLONG-7292][Sort] Accurately detect and archive dirty data for 
Doris/StarRocks/JDBC (#7289)
---
 .../inlong/sort/base/dirty/DirtySinkHelper.java    | 124 ++++++++++++++++++---
 .../inlong/sort/base/dirty/RegexReplaceTest.java   |  38 +++++++
 .../table/DorisDynamicSchemaOutputFormat.java      |   8 +-
 .../internal/JdbcMultiBatchingOutputFormat.java    |   4 +-
 .../starrocks/manager/StarRocksSinkManager.java    |   9 +-
 5 files changed, 162 insertions(+), 21 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
index e815db856..3f3b14b6a 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java
@@ -28,7 +28,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 /**
  * Dirty sink helper, it helps dirty data sink for {@link DirtySink}
@@ -38,6 +45,7 @@ public class DirtySinkHelper<T> implements Serializable {
 
     private static final long serialVersionUID = 1L;
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DirtySinkHelper.class);
+    static final Pattern REGEX_PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
 
     private DirtyOptions dirtyOptions;
     private final @Nullable DirtySink<T> dirtySink;
@@ -97,10 +105,8 @@ public class DirtySinkHelper<T> implements Serializable {
         }
     }
 
-    public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e,
+    public void invokeMultiple(String tableIdentifier, T dirtyData, DirtyType 
dirtyType, Throwable e,
             String sinkMultipleFormat) {
-        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
-                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
         if (!dirtyOptions.ignoreDirty()) {
             RuntimeException ex;
             if (e instanceof RuntimeException) {
@@ -110,22 +116,60 @@ public class DirtySinkHelper<T> implements Serializable {
             }
             throw ex;
         }
-        if (dirtySink != null) {
-            JsonNode rootNode;
-            DirtyData.Builder<T> builder = DirtyData.builder();
-            try {
+
+        JsonDynamicSchemaFormat jsonDynamicSchemaFormat =
+                (JsonDynamicSchemaFormat) 
DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat);
+        JsonNode rootNode = null;
+        String[] actualIdentifier = tableIdentifier.split("\\.");;
+
+        try {
+            // for rowdata where identifier is not the first element, append 
identifier and SEPARATOR before it.
+            if (dirtyData instanceof RowData) {
                 rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
-            } catch (Exception ex) {
-                invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
-                return;
+                handleDirty(dirtyType, e, null, rootNode, 
jsonDynamicSchemaFormat, dirtyData);
+            } else if (dirtyData instanceof JsonNode) {
+                rootNode = (JsonNode) dirtyData;
+                handleDirty(dirtyType, e, null, rootNode, 
jsonDynamicSchemaFormat, dirtyData);
+            } else if (dirtyData instanceof String) {
+                handleDirty(dirtyType, e, actualIdentifier, null, 
jsonDynamicSchemaFormat, dirtyData);
+            } else {
+                throw new Exception("unidentified dirty data " + dirtyData);
             }
+        } catch (Exception ex) {
+            LOGGER.warn("parse dirty data {} of class {} failed", dirtyData, 
dirtyData.getClass());
+            invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+        }
+    }
+
+    private void handleDirty(DirtyType dirtyType, Throwable e,
+            String[] actualIdentifier, JsonNode rootNode, 
JsonDynamicSchemaFormat jsonDynamicSchemaFormat,
+            T dirtyData) {
+        if (dirtySink != null) {
+            DirtyData.Builder<T> builder = DirtyData.builder();
             try {
-                builder.setData(dirtyData)
-                        .setDirtyType(dirtyType)
-                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
-                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
-                        .setDirtyMessage(e.getMessage())
-                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
+                if (rootNode != null) {
+                    String labels = regexReplace(dirtyOptions.getLabels(), 
dirtyType, e.getMessage(), null);
+                    String logTag = regexReplace(dirtyOptions.getLogTag(), 
dirtyType, e.getMessage(), null);
+                    String identifier = 
regexReplace(dirtyOptions.getIdentifier(), dirtyType, e.getMessage(), null);
+                    builder.setData(dirtyData)
+                            .setDirtyType(dirtyType)
+                            .setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
labels))
+                            .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
logTag))
+                            .setDirtyMessage(e.getMessage())
+                            
.setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, identifier));
+                } else {
+                    // for dirty data without proper rootnode, parse 
completely into string literals
+                    String labels = regexReplace(dirtyOptions.getLabels(), 
dirtyType, e.getMessage(), actualIdentifier);
+                    String logTag = regexReplace(dirtyOptions.getLogTag(), 
dirtyType, e.getMessage(), actualIdentifier);
+                    String identifier =
+                            regexReplace(dirtyOptions.getIdentifier(), 
dirtyType, e.getMessage(), actualIdentifier);
+                    builder.setData(dirtyData)
+                            .setDirtyType(dirtyType)
+                            .setLabels(labels)
+                            .setLogTag(logTag)
+                            .setDirtyMessage(e.getMessage())
+                            .setIdentifier(identifier);
+                }
                 dirtySink.invoke(builder.build());
             } catch (Exception ex) {
                 if (!dirtyOptions.ignoreSideOutputErrors()) {
@@ -136,6 +180,54 @@ public class DirtySinkHelper<T> implements Serializable {
         }
     }
 
+    public static String regexReplace(String pattern, DirtyType dirtyType,
+            String dirtyMessage, String[] actualIdentifier) throws IOException 
{
+
+        if (pattern == null) {
+            return null;
+        }
+
+        final String DIRTY_TYPE_KEY = "DIRTY_TYPE";
+        final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE";
+        final String SYSTEM_TIME_KEY = "SYSTEM_TIME";
+        final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+        Map<String, String> paramMap = new HashMap<>();
+        paramMap.put(SYSTEM_TIME_KEY, 
DATE_TIME_FORMAT.format(LocalDateTime.now()));
+        paramMap.put(DIRTY_TYPE_KEY, dirtyType.format());
+        paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage);
+
+        Matcher matcher = REGEX_PATTERN.matcher(pattern);
+
+        // if RootNode is not available, generate a complete paramMap with 
{database} {table},etc.
+        if (actualIdentifier != null) {
+            int i = 0;
+            while (matcher.find()) {
+                try {
+                    String keyText = matcher.group(1);
+                    int finalI = i;
+                    paramMap.computeIfAbsent(keyText, k -> 
actualIdentifier[finalI]);
+                } catch (Exception e) {
+                    throw new IOException("param map replacement failed", e);
+                }
+                i++;
+            }
+        }
+
+        matcher = REGEX_PATTERN.matcher(pattern);
+        StringBuffer sb = new StringBuffer();
+        while (matcher.find()) {
+            String keyText = matcher.group(1);
+            String replacement = paramMap.get(keyText);
+            if (replacement == null) {
+                continue;
+            }
+            matcher.appendReplacement(sb, replacement);
+        }
+        matcher.appendTail(sb);
+        return sb.toString();
+    }
+
     public void setDirtyOptions(DirtyOptions dirtyOptions) {
         this.dirtyOptions = dirtyOptions;
     }
diff --git 
a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
new file mode 100644
index 000000000..f6b21ed26
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.dirty;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+@Slf4j
+public class RegexReplaceTest {
+
+    @Test
+    public void testRegexReplacement() throws IOException {
+        String[] identifier = new String[2];
+        identifier[0] = "yizhouyang";
+        identifier[1] = "table2";
+        String pattern = "${database}-${table}-${DIRTY_MESSAGE}";
+        String answer = DirtySinkHelper.regexReplace(pattern, 
DirtyType.BATCH_LOAD_ERROR, "mock message", identifier);
+        Assert.assertEquals("yizhouyang-table2-mock message", answer);
+    }
+}
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 0878124f5..93e68370a 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -499,8 +499,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
         }
 
         if (multipleSink) {
-            handleMultipleDirtyData(dirtyData, dirtyType, e);
-            return;
+            if (dirtyType == DirtyType.DESERIALIZE_ERROR) {
+                LOG.error("database and table can't be identified, will use 
default ${database}${table}");
+            } else {
+                handleMultipleDirtyData(dirtyData, dirtyType, e);
+                return;
+            }
         }
 
         if (dirtySink != null) {
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index ead5b58a3..698290f91 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -572,7 +572,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                         outputMetrics(tableIdentifier, 
Long.valueOf(tableIdRecordList.size()),
                                 1L, true);
                         if 
(!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP))
 {
-                            dirtySinkHelper.invokeMultiple(record, 
DirtyType.RETRY_LOAD_ERROR, tableException,
+                            dirtySinkHelper.invokeMultiple(
+                                    tableIdentifier, record.toString(),
+                                    DirtyType.RETRY_LOAD_ERROR, tableException,
                                     sinkMultipleFormat);
                         }
                         tableExceptionMap.put(tableIdentifier, tableException);
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index 19c762192..6e10725e8 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -451,12 +451,17 @@ public class StarRocksSinkManager implements Serializable 
{
         // archive dirty data
         if 
(StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))
 {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invokeMultiple(new String(row, 
StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e,
+                dirtySinkHelper.invokeMultiple(
+                        flushData.getDatabase() + "." + flushData.getTable(),
+                        new String(row, StandardCharsets.UTF_8),
+                        DirtyType.BATCH_LOAD_ERROR, e,
                         sinkMultipleFormat);
             }
         } else if 
(StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat()))
 {
             for (byte[] row : flushData.getBuffer()) {
-                dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new 
String(row, StandardCharsets.UTF_8)),
+                dirtySinkHelper.invokeMultiple(
+                        flushData.getDatabase() + "." + flushData.getTable(),
+                        OBJECT_MAPPER.readTree(new String(row, 
StandardCharsets.UTF_8)).toString(),
                         DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat);
             }
         }

Reply via email to