This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d0124270a [INLONG-6899][Sort] StarRocks supports table level metric 
(#6903)
d0124270a is described below

commit d0124270a7b7778093c33e87288e0c4e06258cac
Author: Liao Rui <liao...@users.noreply.github.com>
AuthorDate: Wed Dec 21 14:27:47 2022 +0800

    [INLONG-6899][Sort] StarRocks supports table level metric (#6903)
---
 .../sort/base/metric/sub/SinkSubMetricData.java    |  11 +-
 .../sort/base/metric/sub/SinkTableMetricData.java  | 216 +++++++++++++++++++++
 .../sort/base/metric/sub/SinkTopicMetricData.java  |   2 +-
 .../inlong/sort/base/util/MetricStateUtils.java    |   2 +-
 .../starrocks/manager/StarRocksSinkManager.java    |  13 +-
 .../table/sink/StarRocksDynamicSinkFunction.java   |  10 +-
 6 files changed, 240 insertions(+), 14 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
index 143a925ca..3f863a317 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java
@@ -17,10 +17,12 @@
 
 package org.apache.inlong.sort.base.metric.sub;
 
-import org.apache.inlong.sort.base.metric.SinkMetricData;
-
 import java.util.Map;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
 
+/**
+ * A collection class for handling sub metrics
+ */
 public interface SinkSubMetricData {
 
     /**
@@ -28,6 +30,5 @@ public interface SinkSubMetricData {
      *
      * @return The sub sink metric map
      */
-    Map<String, SinkMetricData> getSubSourceMetricMap();
-
-}
+    Map<String, SinkMetricData> getSubSinkMetricMap();
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
new file mode 100644
index 000000000..82dcf74f1
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric.sub;
+
+import static org.apache.inlong.sort.base.Constants.DELIMITER;
+import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
+import com.google.common.collect.Maps;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.inlong.sort.base.Constants;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A collection class for handling sub metrics of table schema type
+ */
+public class SinkTableMetricData extends SinkMetricData implements 
SinkSubMetricData {
+
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(SinkTableMetricData.class);
+
+    /**
+     * The sub sink metric data container of sink metric data
+     */
+    private final Map<String, SinkMetricData> subSinkMetricMap = 
Maps.newHashMap();
+
+    public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) {
+        super(option, metricGroup);
+    }
+
+    /**
+     * register sub sink metrics group from metric state
+     *
+     * @param metricState MetricState
+     */
+    public void registerSubMetricsGroup(MetricState metricState) {
+        if (metricState == null) {
+            return;
+        }
+
+        // register sub sink metric data
+        if (metricState.getSubMetricStateMap() == null) {
+            return;
+        }
+        Map<String, MetricState> subMetricStateMap = 
metricState.getSubMetricStateMap();
+        for (Entry<String, MetricState> subMetricStateEntry : 
subMetricStateMap.entrySet()) {
+            String[] schemaInfoArray = 
parseSchemaIdentify(subMetricStateEntry.getKey());
+            final MetricState subMetricState = subMetricStateEntry.getValue();
+            SinkMetricData subSinkMetricData = 
buildSubSinkMetricData(schemaInfoArray, subMetricState, this);
+            subSinkMetricMap.put(subMetricStateEntry.getKey(), 
subSinkMetricData);
+        }
+        LOGGER.info("register subMetricsGroup from metricState,sub metric map 
size:{}", subSinkMetricMap.size());
+    }
+
+    /**
+     * build sub sink metric data
+     *
+     * @param schemaInfoArray sink record schema info
+     * @param sinkMetricData sink metric data
+     * @return sub sink metric data
+     */
+    private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, 
SinkMetricData sinkMetricData) {
+        return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData);
+    }
+
+    /**
+     * build sub sink metric data
+     *
+     * @param schemaInfoArray the schema info array of record
+     * @param subMetricState sub metric state
+     * @param sinkMetricData sink metric data
+     * @return sub sink metric data
+     */
+    private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, 
MetricState subMetricState,
+            SinkMetricData sinkMetricData) {
+        if (sinkMetricData == null || schemaInfoArray == null) {
+            return null;
+        }
+        // build sub sink metric data
+        Map<String, String> labels = sinkMetricData.getLabels();
+        String metricGroupLabels = labels.entrySet().stream().map(entry -> 
entry.getKey() + "=" + entry.getValue())
+                .collect(Collectors.joining(DELIMITER));
+        StringBuilder labelBuilder = new StringBuilder(metricGroupLabels);
+        if (schemaInfoArray.length == 2) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]);
+        } else if (schemaInfoArray.length == 3) {
+            
labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0])
+                    
.append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1])
+                    
.append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]);
+        }
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInitRecords(subMetricState != null ? 
subMetricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+                .withInitBytes(subMetricState != null ? 
subMetricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+                .withInitDirtyRecords(subMetricState != null ? 
subMetricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L)
+                .withInitDirtyBytes(subMetricState != null ? 
subMetricState.getMetricValue(DIRTY_BYTES_OUT) : 0L)
+                
.withInlongLabels(labelBuilder.toString()).withRegisterMetric(RegisteredMetric.ALL).build();
+        return new SinkTableMetricData(metricOption, 
sinkMetricData.getMetricGroup());
+    }
+
+    /**
+     * build record schema identify,in the form of database.schema.table or 
database.table
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @return the record schema identify
+     */
+    public String buildSchemaIdentify(String database, String schema, String 
table) {
+        if (schema == null) {
+            return database + Constants.SEMICOLON + table;
+        }
+        return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + 
table;
+    }
+
+    /**
+     * parse record schema identify
+     *
+     * @param schemaIdentify the schema identify of record
+     * @return the record schema identify array,String[]{database,table}
+     */
+    public String[] parseSchemaIdentify(String schemaIdentify) {
+        return schemaIdentify.split(Constants.SPILT_SEMICOLON);
+    }
+
+    /**
+     * output metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param isSnapshotRecord is it snapshot record
+     * @param data the data of record
+     */
+    public void outputMetricsWithEstimate(String database, String schema, 
String table, boolean isSnapshotRecord,
+            Object data) {
+        // sink metric and sub sink metric output metrics
+        long rowCountSize = 1L;
+        long rowDataSize = 0L;
+        if (data != null) {
+            rowDataSize = 
data.toString().getBytes(StandardCharsets.UTF_8).length;
+        }
+        outputMetricsWithEstimate(database, schema, table, isSnapshotRecord, 
rowCountSize, rowDataSize);
+    }
+
+    /**
+     * output metrics with estimate
+     *
+     * @param database the database name of record
+     * @param schema the schema name of record
+     * @param table the table name of record
+     * @param isSnapshotRecord is it snapshot record
+     * @param rowCount the row count of records
+     * @param rowSize the row size of records
+     */
+    public void outputMetricsWithEstimate(String database, String schema, 
String table, boolean isSnapshotRecord,
+            long rowCount, long rowSize) {
+        if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) {
+            invoke(rowCount, rowSize);
+            return;
+        }
+        String identify = buildSchemaIdentify(database, schema, table);
+        SinkMetricData subSinkMetricData;
+        if (subSinkMetricMap.containsKey(identify)) {
+            subSinkMetricData = subSinkMetricMap.get(identify);
+        } else {
+            subSinkMetricData = buildSubSinkMetricData(new String[]{database, 
schema, table}, this);
+            subSinkMetricMap.put(identify, subSinkMetricData);
+        }
+        // sink metric and sub sink metric output metrics
+        this.invoke(rowCount, rowSize);
+        subSinkMetricData.invoke(rowCount, rowSize);
+    }
+
+    public void outputMetricsWithEstimate(Object data) {
+        long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
+        invoke(1, size);
+    }
+
+    @Override
+    public Map<String, SinkMetricData> getSubSinkMetricMap() {
+        return this.subSinkMetricMap;
+    }
+
+    @Override
+    public String toString() {
+        return "SinkTableMetricData{" + "subSinkMetricMap=" + subSinkMetricMap 
+ '}';
+    }
+}
\ No newline at end of file
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
index 98735f053..ebe2048ea 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java
@@ -96,7 +96,7 @@ public class SinkTopicMetricData extends SinkMetricData 
implements SinkSubMetric
     }
 
     @Override
-    public Map<String, SinkMetricData> getSubSourceMetricMap() {
+    public Map<String, SinkMetricData> getSubSinkMetricMap() {
         return this.sinkMetricMap;
     }
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index 4bf1b8930..95aa3c477 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -249,7 +249,7 @@ public class MetricStateUtils {
         }
         SinkSubMetricData sinkSubMetricData = (SinkSubMetricData) 
sinkMetricData;
 
-        Map<String, SinkMetricData> subSinkMetricMap = 
sinkSubMetricData.getSubSourceMetricMap();
+        Map<String, SinkMetricData> subSinkMetricMap = 
sinkSubMetricData.getSubSinkMetricMap();
         if (subSinkMetricMap != null && !subSinkMetricMap.isEmpty()) {
             Map<String, MetricState> subMetricStateMap = new HashMap<>();
             Set<Entry<String, SinkMetricData>> entries = 
subSinkMetricMap.entrySet();
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
index a6952eb6d..39d77c472 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java
@@ -51,7 +51,7 @@ import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,14 +115,14 @@ public class StarRocksSinkManager implements Serializable 
{
 
     private final boolean multipleSink;
     private final SchemaUpdateExceptionPolicy schemaUpdatePolicy;
-    private transient SinkMetricData metricData;
+    private transient SinkTableMetricData metricData;
 
     /**
      * If a table writing throws exception, ignore it when receiving data 
later again
      */
     private Set<String> ignoreWriteTables = new HashSet<>();
 
-    public void setSinkMetricData(SinkMetricData metricData) {
+    public void setSinkMetricData(SinkTableMetricData metricData) {
         this.metricData = metricData;
     }
 
@@ -391,7 +391,12 @@ public class StarRocksSinkManager implements Serializable {
                     updateMetricsFromStreamLoadResult(result);
 
                     if (null != metricData) {
-                        metricData.invoke(flushData.getBatchCount(), 
flushData.getBatchSize());
+                        if (multipleSink) {
+                            
metricData.outputMetricsWithEstimate(flushData.getDatabase(), null, 
flushData.getTable(),
+                                    false, flushData.getBatchCount(), 
flushData.getBatchSize());
+                        } else {
+                            metricData.invoke(flushData.getBatchCount(), 
flushData.getBatchSize());
+                        }
                     }
                 }
                 startScheduler();
diff --git 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
index 93a1363ff..422715d53 100644
--- 
a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java
@@ -63,7 +63,7 @@ import 
org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
 import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricState;
-import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.starrocks.manager.StarRocksSinkManager;
@@ -95,7 +95,7 @@ public class StarRocksDynamicSinkFunction<T> extends 
RichSinkFunction<T> impleme
     private final String tablePattern;
 
     private final String inlongMetric;
-    private transient SinkMetricData metricData;
+    private transient SinkTableMetricData metricData;
     private transient ListState<MetricState> metricStateListState;
     private transient MetricState metricState;
     private final String auditHostAndPorts;
@@ -152,7 +152,11 @@ public class StarRocksDynamicSinkFunction<T> extends 
RichSinkFunction<T> impleme
                 .withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
                 .withRegisterMetric(MetricOption.RegisteredMetric.ALL).build();
         if (metricOption != null) {
-            metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            metricData = new SinkTableMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+            if (multipleSink) {
+                // register sub sink metric data from metric state
+                metricData.registerSubMetricsGroup(metricState);
+            }
             sinkManager.setSinkMetricData(metricData);
         }
     }

Reply via email to