This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d0124270a [INLONG-6899][Sort] StarRocks supports table level metric (#6903) d0124270a is described below commit d0124270a7b7778093c33e87288e0c4e06258cac Author: Liao Rui <liao...@users.noreply.github.com> AuthorDate: Wed Dec 21 14:27:47 2022 +0800 [INLONG-6899][Sort] StarRocks supports table level metric (#6903) --- .../sort/base/metric/sub/SinkSubMetricData.java | 11 +- .../sort/base/metric/sub/SinkTableMetricData.java | 216 +++++++++++++++++++++ .../sort/base/metric/sub/SinkTopicMetricData.java | 2 +- .../inlong/sort/base/util/MetricStateUtils.java | 2 +- .../starrocks/manager/StarRocksSinkManager.java | 13 +- .../table/sink/StarRocksDynamicSinkFunction.java | 10 +- 6 files changed, 240 insertions(+), 14 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java index 143a925ca..3f863a317 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkSubMetricData.java @@ -17,10 +17,12 @@ package org.apache.inlong.sort.base.metric.sub; -import org.apache.inlong.sort.base.metric.SinkMetricData; - import java.util.Map; +import org.apache.inlong.sort.base.metric.SinkMetricData; +/** + * A collection class for handling sub metrics + */ public interface SinkSubMetricData { /** @@ -28,6 +30,5 @@ public interface SinkSubMetricData { * * @return The sub sink metric map */ - Map<String, SinkMetricData> getSubSourceMetricMap(); - -} + Map<String, SinkMetricData> getSubSinkMetricMap(); +} \ No newline at end of file diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java new file mode 100644 index 000000000..82dcf74f1 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.metric.sub; + +import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + +import com.google.common.collect.Maps; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collection class for handling sub metrics of table schema type + */ +public class SinkTableMetricData extends SinkMetricData implements SinkSubMetricData { + + public static final Logger LOGGER = LoggerFactory.getLogger(SinkTableMetricData.class); + + /** + * The sub sink metric data container of sink metric data + */ + private final Map<String, SinkMetricData> subSinkMetricMap = Maps.newHashMap(); + + public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) { + super(option, metricGroup); + } + + /** + * register sub sink metrics group from metric state + * + * @param metricState MetricState + */ + public void registerSubMetricsGroup(MetricState metricState) { + if (metricState == null) { + return; + } + + // register sub sink metric data + if (metricState.getSubMetricStateMap() == null) { + return; + } + Map<String, MetricState> subMetricStateMap = metricState.getSubMetricStateMap(); + for (Entry<String, MetricState> subMetricStateEntry : subMetricStateMap.entrySet()) { + String[] schemaInfoArray = parseSchemaIdentify(subMetricStateEntry.getKey()); + final MetricState subMetricState = subMetricStateEntry.getValue(); + SinkMetricData subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, subMetricState, this); + subSinkMetricMap.put(subMetricStateEntry.getKey(), subSinkMetricData); + } + LOGGER.info("register subMetricsGroup from metricState,sub metric map size:{}", subSinkMetricMap.size()); + } + + /** + * build sub sink metric data + * + * @param schemaInfoArray sink record schema info + * @param sinkMetricData sink metric data + * @return sub sink metric data + */ + private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, SinkMetricData sinkMetricData) { + return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData); + } + + /** + * build sub sink metric data + * + * @param schemaInfoArray the schema info array of record + * @param subMetricState sub metric state + * @param sinkMetricData sink metric data + * @return sub sink metric data + */ + private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState, + SinkMetricData sinkMetricData) { + if (sinkMetricData == null || schemaInfoArray == null) { + return null; + } + // build sub sink metric data + Map<String, String> labels = sinkMetricData.getLabels(); + String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(DELIMITER)); + StringBuilder labelBuilder = new StringBuilder(metricGroupLabels); + if (schemaInfoArray.length == 2) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); + } else if (schemaInfoArray.length == 3) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]); + } + + MetricOption metricOption = MetricOption.builder() + .withInitRecords(subMetricState != null ? subMetricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(subMetricState != null ? subMetricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withInitDirtyRecords(subMetricState != null ? subMetricState.getMetricValue(DIRTY_RECORDS_OUT) : 0L) + .withInitDirtyBytes(subMetricState != null ? subMetricState.getMetricValue(DIRTY_BYTES_OUT) : 0L) + .withInlongLabels(labelBuilder.toString()).withRegisterMetric(RegisteredMetric.ALL).build(); + return new SinkTableMetricData(metricOption, sinkMetricData.getMetricGroup()); + } + + /** + * build record schema identify,in the form of database.schema.table or database.table + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @return the record schema identify + */ + public String buildSchemaIdentify(String database, String schema, String table) { + if (schema == null) { + return database + Constants.SEMICOLON + table; + } + return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + table; + } + + /** + * parse record schema identify + * + * @param schemaIdentify the schema identify of record + * @return the record schema identify array,String[]{database,table} + */ + public String[] parseSchemaIdentify(String schemaIdentify) { + return schemaIdentify.split(Constants.SPILT_SEMICOLON); + } + + /** + * output metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @param isSnapshotRecord is it snapshot record + * @param data the data of record + */ + public void outputMetricsWithEstimate(String database, String schema, String table, boolean isSnapshotRecord, + Object data) { + // sink metric and sub sink metric output metrics + long rowCountSize = 1L; + long rowDataSize = 0L; + if (data != null) { + rowDataSize = data.toString().getBytes(StandardCharsets.UTF_8).length; + } + outputMetricsWithEstimate(database, schema, table, isSnapshotRecord, rowCountSize, rowDataSize); + } + + /** + * output metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @param isSnapshotRecord is it snapshot record + * @param rowCount the row count of records + * @param rowSize the row size of records + */ + public void outputMetricsWithEstimate(String database, String schema, String table, boolean isSnapshotRecord, + long rowCount, long rowSize) { + if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { + invoke(rowCount, rowSize); + return; + } + String identify = buildSchemaIdentify(database, schema, table); + SinkMetricData subSinkMetricData; + if (subSinkMetricMap.containsKey(identify)) { + subSinkMetricData = subSinkMetricMap.get(identify); + } else { + subSinkMetricData = buildSubSinkMetricData(new String[]{database, schema, table}, this); + subSinkMetricMap.put(identify, subSinkMetricData); + } + // sink metric and sub sink metric output metrics + this.invoke(rowCount, rowSize); + subSinkMetricData.invoke(rowCount, rowSize); + } + + public void outputMetricsWithEstimate(Object data) { + long size = data.toString().getBytes(StandardCharsets.UTF_8).length; + invoke(1, size); + } + + @Override + public Map<String, SinkMetricData> getSubSinkMetricMap() { + return this.subSinkMetricMap; + } + + @Override + public String toString() { + return "SinkTableMetricData{" + "subSinkMetricMap=" + subSinkMetricMap + '}'; + } +} \ No newline at end of file diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java index 98735f053..ebe2048ea 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTopicMetricData.java @@ -96,7 +96,7 @@ public class SinkTopicMetricData extends SinkMetricData implements SinkSubMetric } @Override - public Map<String, SinkMetricData> getSubSourceMetricMap() { + public Map<String, SinkMetricData> getSubSinkMetricMap() { return this.sinkMetricMap; } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java index 4bf1b8930..95aa3c477 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java @@ -249,7 +249,7 @@ public class MetricStateUtils { } SinkSubMetricData sinkSubMetricData = (SinkSubMetricData) sinkMetricData; - Map<String, SinkMetricData> subSinkMetricMap = sinkSubMetricData.getSubSourceMetricMap(); + Map<String, SinkMetricData> subSinkMetricMap = sinkSubMetricData.getSubSinkMetricMap(); if (subSinkMetricMap != null && !subSinkMetricMap.isEmpty()) { Map<String, MetricState> subMetricStateMap = new HashMap<>(); Set<Entry<String, SinkMetricData>> entries = subSinkMetricMap.entrySet(); diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java index a6952eb6d..39d77c472 100644 --- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java +++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/manager/StarRocksSinkManager.java @@ -51,7 +51,7 @@ import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,14 +115,14 @@ public class StarRocksSinkManager implements Serializable { private final boolean multipleSink; private final SchemaUpdateExceptionPolicy schemaUpdatePolicy; - private transient SinkMetricData metricData; + private transient SinkTableMetricData metricData; /** * If a table writing throws exception, ignore it when receiving data later again */ private Set<String> ignoreWriteTables = new HashSet<>(); - public void setSinkMetricData(SinkMetricData metricData) { + public void setSinkMetricData(SinkTableMetricData metricData) { this.metricData = metricData; } @@ -391,7 +391,12 @@ public class StarRocksSinkManager implements Serializable { updateMetricsFromStreamLoadResult(result); if (null != metricData) { - metricData.invoke(flushData.getBatchCount(), flushData.getBatchSize()); + if (multipleSink) { + metricData.outputMetricsWithEstimate(flushData.getDatabase(), null, flushData.getTable(), + false, flushData.getBatchCount(), flushData.getBatchSize()); + } else { + metricData.invoke(flushData.getBatchCount(), flushData.getBatchSize()); + } } } startScheduler(); diff --git a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java index 93a1363ff..422715d53 100644 --- a/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java +++ b/inlong-sort/sort-connectors/starrocks/src/main/java/org/apache/inlong/sort/starrocks/table/sink/StarRocksDynamicSinkFunction.java @@ -63,7 +63,7 @@ import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.format.JsonDynamicSchemaFormat; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.starrocks.manager.StarRocksSinkManager; @@ -95,7 +95,7 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme private final String tablePattern; private final String inlongMetric; - private transient SinkMetricData metricData; + private transient SinkTableMetricData metricData; private transient ListState<MetricState> metricStateListState; private transient MetricState metricState; private final String auditHostAndPorts; @@ -152,7 +152,11 @@ public class StarRocksDynamicSinkFunction<T> extends RichSinkFunction<T> impleme .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(MetricOption.RegisteredMetric.ALL).build(); if (metricOption != null) { - metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup()); + if (multipleSink) { + // register sub sink metric data from metric state + metricData.registerSubMetricsGroup(metricState); + } sinkManager.setSinkMetricData(metricData); } }