gong commented on code in PR #6903: URL: https://github.com/apache/inlong/pull/6903#discussion_r1050398144
########## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.metric.sub; + +import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; +import static org.apache.inlong.sort.base.Constants.READ_PHASE; + +import com.google.common.collect.Maps; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.apache.flink.metrics.MetricGroup; +import org.apache.inlong.sort.base.Constants; +import org.apache.inlong.sort.base.enums.ReadPhase; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.metric.phase.ReadPhaseMetricData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A collection class for handling sub metrics of table schema type + */ +public class SinkTableMetricData extends SinkMetricData implements SinkSubMetricData { + + public static final Logger LOGGER = LoggerFactory.getLogger(SinkTableMetricData.class); + + /** + * The read phase metric container of sink metric data + */ + private final Map<ReadPhase, ReadPhaseMetricData> readPhaseMetricDataMap = Maps.newHashMap(); + /** + * The sub sink metric data container of sink metric data + */ + private final Map<String, SinkMetricData> subSinkMetricMap = Maps.newHashMap(); + + public SinkTableMetricData(MetricOption option, MetricGroup metricGroup) { + super(option, metricGroup); + } + + /** + * register sub sink metrics group from metric state + * + * @param metricState MetricState + */ + public void registerSubMetricsGroup(MetricState metricState) { + if (metricState == null) { + return; + } + // register sink read phase metric + Stream.of(ReadPhase.values()).forEach(readPhase -> { + Long readPhaseMetricValue = metricState.getMetricValue(readPhase.getPhase()); + if (readPhaseMetricValue > 0) { + outputReadPhaseMetrics(metricState, readPhase); + } + }); + + // register sub sink metric data + if (metricState.getSubMetricStateMap() == null) { + return; + } + Map<String, MetricState> subMetricStateMap = metricState.getSubMetricStateMap(); + for (Entry<String, MetricState> subMetricStateEntry : subMetricStateMap.entrySet()) { + String[] schemaInfoArray = parseSchemaIdentify(subMetricStateEntry.getKey()); + final MetricState subMetricState = subMetricStateEntry.getValue(); + SinkMetricData subSinkMetricData = buildSubSinkMetricData(schemaInfoArray, + subMetricState, this); + subSinkMetricMap.put(subMetricStateEntry.getKey(), subSinkMetricData); + } + LOGGER.info("register subMetricsGroup from metricState,sub metric map size:{}", subSinkMetricMap.size()); + } + + /** + * build sub sink metric data + * + * @param schemaInfoArray sink record schema info + * @param sinkMetricData sink metric data + * @return sub sink metric data + */ + private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, SinkMetricData sinkMetricData) { + return buildSubSinkMetricData(schemaInfoArray, null, sinkMetricData); + } + + /** + * build sub sink metric data + * + * @param schemaInfoArray the schema info array of record + * @param subMetricState sub metric state + * @param sinkMetricData sink metric data + * @return sub sink metric data + */ + private SinkMetricData buildSubSinkMetricData(String[] schemaInfoArray, MetricState subMetricState, + SinkMetricData sinkMetricData) { + if (sinkMetricData == null || schemaInfoArray == null) { + return null; + } + // build sub sink metric data + Map<String, String> labels = sinkMetricData.getLabels(); + String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(DELIMITER)); + StringBuilder labelBuilder = new StringBuilder(metricGroupLabels); + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); + + MetricOption metricOption = MetricOption.builder() + .withInitRecords(subMetricState != null ? subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(subMetricState != null ? subMetricState.getMetricValue(NUM_BYTES_IN) : 0L) + .withInlongLabels(labelBuilder.toString()) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + return new SinkTableMetricData(metricOption, sinkMetricData.getMetricGroup()); + } + + /** + * build record schema identify,in the form of database.table + * + * @param database the database name of record + * @param table the table name of record + * @return the record schema identify + */ + public String buildSchemaIdentify(String database, String table) { + return database + Constants.SEMICOLON + table; + } + + /** + * parse record schema identify + * + * @param schemaIdentify the schema identify of record + * @return the record schema identify array,String[]{database,table} + */ + public String[] parseSchemaIdentify(String schemaIdentify) { + return schemaIdentify.split(Constants.SPILT_SEMICOLON); + } + + /** + * output metrics with estimate + * + * @param database the database name of record + * @param table the table name of record + * @param isSnapshotRecord is it snapshot record + * @param data the data of record + */ + public void outputMetricsWithEstimate(String database, String table, boolean isSnapshotRecord, Object data) { + if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { + outputMetricsWithEstimate(data); + return; + } + String identify = buildSchemaIdentify(database, table); + SinkMetricData subSinkMetricData; + if (subSinkMetricMap.containsKey(identify)) { + subSinkMetricData = subSinkMetricMap.get(identify); + } else { + subSinkMetricData = buildSubSinkMetricData(new String[]{database, table}, this); + subSinkMetricMap.put(identify, subSinkMetricData); + } + // sink metric and sub sink metric output metrics + long rowCountSize = 1L; + long rowDataSize = data.toString().getBytes(StandardCharsets.UTF_8).length; + this.invoke(rowCountSize, rowDataSize); + subSinkMetricData.invoke(rowCountSize, rowDataSize); + + // output read phase metric + outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : ReadPhase.INCREASE_PHASE); + } + + /** + * output metrics with estimate + * + * @param database the database name of record + * @param table the table name of record + * @param isSnapshotRecord is it snapshot record + * @param rowCount the row count of records + * @param rowSize the row size of records + */ + public void outputMetricsWithEstimate(String database, String table, boolean isSnapshotRecord, long rowCount, + long rowSize) { + if (StringUtils.isBlank(database) || StringUtils.isBlank(table)) { + invoke(rowCount, rowSize); + return; + } + String identify = buildSchemaIdentify(database, table); + SinkMetricData subSinkMetricData; + if (subSinkMetricMap.containsKey(identify)) { + subSinkMetricData = subSinkMetricMap.get(identify); + } else { + subSinkMetricData = buildSubSinkMetricData(new String[]{database, table}, this); + subSinkMetricMap.put(identify, subSinkMetricData); + } + // sink metric and sub sink metric output metrics + this.invoke(rowCount, rowSize); + subSinkMetricData.invoke(rowCount, rowSize); + + // output read phase metric + outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : ReadPhase.INCREASE_PHASE); + } + + public void outputMetricsWithEstimate(Object data) { + long size = data.toString().getBytes(StandardCharsets.UTF_8).length; + invoke(1, size); + } + + /** + * output read phase metric + * + * @param readPhase the readPhase of record + */ + public void outputReadPhaseMetrics(ReadPhase readPhase) { + outputReadPhaseMetrics(null, readPhase); + } + + /** + * output read phase metric + * + * @param metricState the metric state of record + * @param readPhase the readPhase of record + */ + public void outputReadPhaseMetrics(MetricState metricState, ReadPhase readPhase) { + ReadPhaseMetricData readPhaseMetricData = readPhaseMetricDataMap.get(readPhase); + if (readPhaseMetricData == null) { + // build read phase metric data + String metricGroupLabels = getLabels().entrySet().stream() + .map(entry -> entry.getKey() + "=" + entry.getValue()) + .collect(Collectors.joining(DELIMITER)); + + MetricOption metricOption = MetricOption.builder() + .withInitReadPhase(metricState != null ? metricState.getMetricValue(readPhase.getPhase()) : 0L) + .withInlongLabels(metricGroupLabels + DELIMITER + READ_PHASE + "=" + readPhase.getCode()) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + readPhaseMetricData = new ReadPhaseMetricData(metricOption, getMetricGroup()); + readPhaseMetricDataMap.put(readPhase, readPhaseMetricData); + } + readPhaseMetricData.outputMetrics(); + } + + @Override + public Map<ReadPhase, ReadPhaseMetricData> getReadPhaseMetricMap() { + return readPhaseMetricDataMap; + } + + @Override + public Map<String, SinkMetricData> getSubSinkMetricMap() { + return this.subSinkMetricMap; + } + + @Override + public String toString() { + return "SinkTableMetricData{" + + "readPhaseMetricDataMap=" + readPhaseMetricDataMap Review Comment: remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org