This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 9c6908e2b [INLONG-7292][Sort] Accurately detect and archive dirty data for Doris/StarRocks/JDBC (#7289) 9c6908e2b is described below commit 9c6908e2b236e35f9b102499243f6f9a191d466a Author: Yizhou Yang <32808678+yizhou-y...@users.noreply.github.com> AuthorDate: Fri Feb 3 10:30:53 2023 +0800 [INLONG-7292][Sort] Accurately detect and archive dirty data for Doris/StarRocks/JDBC (#7289) --- .../inlong/sort/base/dirty/DirtySinkHelper.java | 124 ++++++++++++++++++--- .../inlong/sort/base/dirty/RegexReplaceTest.java | 38 +++++++ .../table/DorisDynamicSchemaOutputFormat.java | 8 +- .../internal/JdbcMultiBatchingOutputFormat.java | 4 +- .../starrocks/manager/StarRocksSinkManager.java | 9 +- 5 files changed, 162 insertions(+), 21 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java index e815db856..3f3b14b6a 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtySinkHelper.java @@ -28,7 +28,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.io.Serializable; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** * Dirty sink helper, it helps dirty data sink for {@link DirtySink} @@ -38,6 +45,7 @@ public class DirtySinkHelper<T> implements Serializable { private static final long serialVersionUID = 1L; private static final Logger LOGGER = LoggerFactory.getLogger(DirtySinkHelper.class); + static final Pattern REGEX_PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); private DirtyOptions dirtyOptions; private final @Nullable DirtySink<T> dirtySink; @@ -97,10 +105,8 @@ public class DirtySinkHelper<T> implements Serializable { } } - public void invokeMultiple(T dirtyData, DirtyType dirtyType, Throwable e, + public void invokeMultiple(String tableIdentifier, T dirtyData, DirtyType dirtyType, Throwable e, String sinkMultipleFormat) { - JsonDynamicSchemaFormat jsonDynamicSchemaFormat = - (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); if (!dirtyOptions.ignoreDirty()) { RuntimeException ex; if (e instanceof RuntimeException) { @@ -110,22 +116,60 @@ public class DirtySinkHelper<T> implements Serializable { } throw ex; } - if (dirtySink != null) { - JsonNode rootNode; - DirtyData.Builder<T> builder = DirtyData.builder(); - try { + + JsonDynamicSchemaFormat jsonDynamicSchemaFormat = + (JsonDynamicSchemaFormat) DynamicSchemaFormatFactory.getFormat(sinkMultipleFormat); + JsonNode rootNode = null; + String[] actualIdentifier = tableIdentifier.split("\\.");; + + try { + // for rowdata where identifier is not the first element, append identifier and SEPARATOR before it. + if (dirtyData instanceof RowData) { rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) dirtyData).getBinary(0)); - } catch (Exception ex) { - invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e); - return; + handleDirty(dirtyType, e, null, rootNode, jsonDynamicSchemaFormat, dirtyData); + } else if (dirtyData instanceof JsonNode) { + rootNode = (JsonNode) dirtyData; + handleDirty(dirtyType, e, null, rootNode, jsonDynamicSchemaFormat, dirtyData); + } else if (dirtyData instanceof String) { + handleDirty(dirtyType, e, actualIdentifier, null, jsonDynamicSchemaFormat, dirtyData); + } else { + throw new Exception("unidentified dirty data " + dirtyData); } + } catch (Exception ex) { + LOGGER.warn("parse dirty data {} of class {} failed", dirtyData, dirtyData.getClass()); + invoke(dirtyData, DirtyType.DESERIALIZE_ERROR, e); + } + } + + private void handleDirty(DirtyType dirtyType, Throwable e, + String[] actualIdentifier, JsonNode rootNode, JsonDynamicSchemaFormat jsonDynamicSchemaFormat, + T dirtyData) { + if (dirtySink != null) { + DirtyData.Builder<T> builder = DirtyData.builder(); try { - builder.setData(dirtyData) - .setDirtyType(dirtyType) - .setLabels(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) - .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) - .setDirtyMessage(e.getMessage()) - .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); + if (rootNode != null) { + String labels = regexReplace(dirtyOptions.getLabels(), dirtyType, e.getMessage(), null); + String logTag = regexReplace(dirtyOptions.getLogTag(), dirtyType, e.getMessage(), null); + String identifier = regexReplace(dirtyOptions.getIdentifier(), dirtyType, e.getMessage(), null); + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setLabels(jsonDynamicSchemaFormat.parse(rootNode, labels)) + .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, logTag)) + .setDirtyMessage(e.getMessage()) + .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, identifier)); + } else { + // for dirty data without proper rootnode, parse completely into string literals + String labels = regexReplace(dirtyOptions.getLabels(), dirtyType, e.getMessage(), actualIdentifier); + String logTag = regexReplace(dirtyOptions.getLogTag(), dirtyType, e.getMessage(), actualIdentifier); + String identifier = + regexReplace(dirtyOptions.getIdentifier(), dirtyType, e.getMessage(), actualIdentifier); + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setLabels(labels) + .setLogTag(logTag) + .setDirtyMessage(e.getMessage()) + .setIdentifier(identifier); + } dirtySink.invoke(builder.build()); } catch (Exception ex) { if (!dirtyOptions.ignoreSideOutputErrors()) { @@ -136,6 +180,54 @@ public class DirtySinkHelper<T> implements Serializable { } } + public static String regexReplace(String pattern, DirtyType dirtyType, + String dirtyMessage, String[] actualIdentifier) throws IOException { + + if (pattern == null) { + return null; + } + + final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; + final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + + Map<String, String> paramMap = new HashMap<>(); + paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); + paramMap.put(DIRTY_TYPE_KEY, dirtyType.format()); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); + + Matcher matcher = REGEX_PATTERN.matcher(pattern); + + // if RootNode is not available, generate a complete paramMap with {database} {table},etc. + if (actualIdentifier != null) { + int i = 0; + while (matcher.find()) { + try { + String keyText = matcher.group(1); + int finalI = i; + paramMap.computeIfAbsent(keyText, k -> actualIdentifier[finalI]); + } catch (Exception e) { + throw new IOException("param map replacement failed", e); + } + i++; + } + } + + matcher = REGEX_PATTERN.matcher(pattern); + StringBuffer sb = new StringBuffer(); + while (matcher.find()) { + String keyText = matcher.group(1); + String replacement = paramMap.get(keyText); + if (replacement == null) { + continue; + } + matcher.appendReplacement(sb, replacement); + } + matcher.appendTail(sb); + return sb.toString(); + } + public void setDirtyOptions(DirtyOptions dirtyOptions) { this.dirtyOptions = dirtyOptions; } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java new file mode 100644 index 000000000..f6b21ed26 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/RegexReplaceTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.dirty; + +import lombok.extern.slf4j.Slf4j; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +@Slf4j +public class RegexReplaceTest { + + @Test + public void testRegexReplacement() throws IOException { + String[] identifier = new String[2]; + identifier[0] = "yizhouyang"; + identifier[1] = "table2"; + String pattern = "${database}-${table}-${DIRTY_MESSAGE}"; + String answer = DirtySinkHelper.regexReplace(pattern, DirtyType.BATCH_LOAD_ERROR, "mock message", identifier); + Assert.assertEquals("yizhouyang-table2-mock message", answer); + } +} diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index 0878124f5..93e68370a 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -499,8 +499,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } if (multipleSink) { - handleMultipleDirtyData(dirtyData, dirtyType, e); - return; + if (dirtyType == DirtyType.DESERIALIZE_ERROR) { + LOG.error("database and table can't be identified, will use default ${database}${table}"); + } else { + handleMultipleDirtyData(dirtyData, dirtyType, e); + return; + } } if (dirtySink != null) { diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java index ead5b58a3..698290f91 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java @@ -572,7 +572,9 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatc outputMetrics(tableIdentifier, Long.valueOf(tableIdRecordList.size()), 1L, true); if (!schemaUpdateExceptionPolicy.equals(SchemaUpdateExceptionPolicy.THROW_WITH_STOP)) { - dirtySinkHelper.invokeMultiple(record, DirtyType.RETRY_LOAD_ERROR, tableException, + dirtySinkHelper.invokeMultiple( + tableIdentifier, record.toString(), + DirtyType.RETRY_LOAD_ERROR, tableException, sinkMultipleFormat); } tableExceptionMap.put(tableIdentifier, tableException); diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java index 19c762192..6e10725e8 100644 --- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java +++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java @@ -451,12 +451,17 @@ public class StarRocksSinkManager implements Serializable { // archive dirty data if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) { for (byte[] row : flushData.getBuffer()) { - dirtySinkHelper.invokeMultiple(new String(row, StandardCharsets.UTF_8), DirtyType.BATCH_LOAD_ERROR, e, + dirtySinkHelper.invokeMultiple( + flushData.getDatabase() + "." + flushData.getTable(), + new String(row, StandardCharsets.UTF_8), + DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat); } } else if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) { for (byte[] row : flushData.getBuffer()) { - dirtySinkHelper.invokeMultiple(OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)), + dirtySinkHelper.invokeMultiple( + flushData.getDatabase() + "." + flushData.getTable(), + OBJECT_MAPPER.readTree(new String(row, StandardCharsets.UTF_8)).toString(), DirtyType.BATCH_LOAD_ERROR, e, sinkMultipleFormat); } }