This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 659385a0bdad7b1eaf264273efe24c9731f33a5e
Author: Yizhou Yang <32808678+yizhou-y...@users.noreply.github.com>
AuthorDate: Tue Jan 3 14:06:22 2023 +0800

    [INLONG-7061][Sort] Support table level metrics for Apache Doris connector 
and add dirty metrics (#7062)
---
 .../sort/base/dirty/sink/s3/S3DirtySink.java       |  3 +-
 .../inlong/sort/base/dirty/sink/s3/S3Helper.java   |  2 +-
 .../sort/base/metric/sub/SinkTableMetricData.java  | 28 +++++++++++++
 .../table/DorisDynamicSchemaOutputFormat.java      | 48 +++++++++++++++++++++-
 4 files changed, 78 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
index ab8fc9464..b8f1f5f10 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java
@@ -22,6 +22,7 @@ import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter;
@@ -244,7 +245,7 @@ public class S3DirtySink<T> implements DirtySink<T> {
         }
         String content = null;
         try {
-            content = StringUtils.join(values, s3Options.getLineDelimiter());
+            content = StringUtils.join(values, 
StringEscapeUtils.unescapeJava(s3Options.getLineDelimiter()));
             s3Helper.upload(identifier, content);
             LOGGER.info("Write {} records to s3 of identifier: {}", 
values.size(), identifier);
             writeOutNum.addAndGet(values.size());
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
index d79b8aecd..f925d76e8 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3Helper.java
@@ -39,7 +39,7 @@ public class S3Helper implements Serializable {
     private static final DateTimeFormatter DATE_TIME_FORMAT = 
DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
 
     private static final int SEQUENCE_LENGTH = 4;
-    private static final String ESCAPE_PATTERN = "[\\pP\\p{Punct}\\s]";
+    private static final String ESCAPE_PATTERN = "[,,+=: ;()()。/.;]";
     private static final String FILE_NAME_SUFFIX = ".txt";
     private final Random r = new Random();
     private final AmazonS3 s3Client;
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index cf5285b13..a5690a5b5 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -196,6 +196,34 @@ public class SinkTableMetricData extends SinkMetricData 
implements SinkSubMetric
         subSinkMetricData.invoke(rowCount, rowSize);
     }
 
+    /**
+     * output dirty metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param rowCount the row count of records
+     * @param rowSize the row size of records
+     */
+    public void outputDirtyMetricsWithEstimate(String database, String table, 
long rowCount,
+            long rowSize) {
+        if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
+            invokeDirty(rowCount, rowSize);
+            return;
+        }
+        String identify = buildSchemaIdentify(database, null, table);
+        SinkMetricData subSinkMetricData;
+        if (subSinkMetricMap.containsKey(identify)) {
+            subSinkMetricData = subSinkMetricMap.get(identify);
+        } else {
+            subSinkMetricData = buildSubSinkMetricData(new String[]{database, 
table}, this);
+            subSinkMetricMap.put(identify, subSinkMetricData);
+        }
+        // sink metric and sub sink metric output metrics
+        this.invokeDirty(rowCount, rowSize);
+        subSinkMetricData.invokeDirty(rowCount, rowSize);
+    }
+
     public void outputMetricsWithEstimate(Object data) {
         long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
         invoke(1, size);
diff --git 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
index 1b9bc4c48..efa04a630 100644
--- 
a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java
@@ -76,6 +76,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
 import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
@@ -141,7 +143,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
     private transient MetricState metricState;
     private final String[] fieldNames;
     private volatile boolean jsonFormat;
-    private String keysType;
     private volatile RowData.FieldGetter[] fieldGetters;
     private String fieldDelimiter;
     private String lineDelimiter;
@@ -267,6 +268,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 .withInlongAudit(auditHostAndPorts)
                 .withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(metricState != null ? 
metricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(metricState != null ? 
metricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
                 .withRegisterMetric(MetricOption.RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
@@ -486,6 +489,12 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
             }
             throw ex;
         }
+
+        if (multipleSink) {
+            handleMultipleDirtyData(dirtyData, dirtyType, e);
+            return;
+        }
+
         if (dirtySink != null) {
             DirtyData.Builder<Object> builder = DirtyData.builder();
             try {
@@ -503,6 +512,43 @@ public class DorisDynamicSchemaOutputFormat<T> extends 
RichOutputFormat<T> {
                 LOG.warn("Dirty sink failed", ex);
             }
         }
+        metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+    }
+
+    private void handleMultipleDirtyData(Object dirtyData, DirtyType 
dirtyType, Exception e) {
+        JsonNode rootNode;
+        try {
+            rootNode = jsonDynamicSchemaFormat.deserialize(((RowData) 
dirtyData).getBinary(0));
+        } catch (Exception ex) {
+            handleDirtyData(dirtyData, DirtyType.DESERIALIZE_ERROR, e);
+            return;
+        }
+
+        if (dirtySink != null) {
+            DirtyData.Builder<Object> builder = DirtyData.builder();
+            try {
+                builder.setData(dirtyData)
+                        .setDirtyType(dirtyType)
+                        .setLabels(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
+                        .setLogTag(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
+                        .setDirtyMessage(e.getMessage())
+                        .setIdentifier(jsonDynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
+                dirtySink.invoke(builder.build());
+            } catch (Exception ex) {
+                if (!dirtyOptions.ignoreSideOutputErrors()) {
+                    throw new RuntimeException(ex);
+                }
+                LOG.warn("Dirty sink failed", ex);
+            }
+        }
+        try {
+            metricData.outputDirtyMetricsWithEstimate(
+                    jsonDynamicSchemaFormat.parse(rootNode, databasePattern),
+                    jsonDynamicSchemaFormat.parse(rootNode, tablePattern), 1,
+                    ((RowData) dirtyData).getBinary(0).length);
+        } catch (Exception ex) {
+            metricData.invokeDirty(1, 
dirtyData.toString().getBytes(StandardCharsets.UTF_8).length);
+        }
     }
 
     private void handleColumnsChange(String tableIdentifier, JsonNode 
rootNode, JsonNode physicalData) {

Reply via email to