This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 501cecb0b [INLONG-7198][Sort] Support audit of Apache Hudi (#7181) 501cecb0b is described below commit 501cecb0bdd6e1e81866fc58a6190c8b52b1e7dc Author: feat <featzh...@outlook.com> AuthorDate: Tue Jan 10 15:36:43 2023 +0800 [INLONG-7198][Sort] Support audit of Apache Hudi (#7181) --- inlong-sort/sort-connectors/hudi/README.md | 22 +++ .../inlong/sort/hudi/metric/HudiAuditReporter.java | 173 +++++++++++++++++++++ .../inlong/sort/hudi/metric/HudiMetricsConfig.java | 134 ++++++++++++++++ .../inlong/sort/hudi/metric/HudiMetricsConst.java | 31 ++++ .../inlong/sort/hudi/metric/HudiMetricsUtil.java | 32 ++++ .../sort/hudi/metric/InLongHudiAuditReporter.java | 90 +++++++++++ .../sort/hudi/metric/HudiMetricsConfigTest.java | 165 ++++++++++++++++++++ 7 files changed, 647 insertions(+) diff --git a/inlong-sort/sort-connectors/hudi/README.md b/inlong-sort/sort-connectors/hudi/README.md new file mode 100644 index 000000000..6180ea858 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/README.md @@ -0,0 +1,22 @@ +# Hudi Sort connector + +## Metric reporter settings + +```property +hoodie.metrics.on=true +hoodie.metrics.reporter.class=org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter +hoodie.metrics.reporter.metricsname.prefix={custom metric name prefix} +hoodie.metrics.inlonghudi.report.period.seconds=30 +inlong.metric.labels={inlong metric label} +metrics.audit.proxy.hosts={inlong metric hosts} +``` + +| property | option | default value | docs | +|-------------------------------------------------|----------|-------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| +| hoodie.metrics.on | required | false | must be 'true' | +| hoodie.metrics.reporter.class | required | org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter | must be 'org.apache.inlong.sort.hudi.metric.InLongHudiAuditReporter' | +| hoodie.metrics.reporter.metricsname.prefix | option | - | The prefix given to the metrics names. | +| hoodie.metrics.inlonghudi.report.period.seconds | required | 30 | InLongHudi reporting period in seconds. Default to 30. | +| inlong.metric.labels | required | - | INLONG metric labels, format is 'key1=value1&key2=value2', default is 'groupId=xxx&streamId=xxx&nodeId=xxx' | +| metrics.audit.proxy.hosts | required | - | Audit proxy host address for reporting audit metrics. e.g. 127.0.0.1:10081,0.0.0.1:10081 | + diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java new file mode 100644 index 000000000..83f5b4d56 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiAuditReporter.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +import org.apache.flink.util.Preconditions; +import org.apache.hudi.com.codahale.metrics.Counter; +import org.apache.hudi.com.codahale.metrics.Gauge; +import org.apache.hudi.com.codahale.metrics.Histogram; +import org.apache.hudi.com.codahale.metrics.Meter; +import org.apache.hudi.com.codahale.metrics.MetricFilter; +import org.apache.hudi.com.codahale.metrics.MetricRegistry; +import org.apache.hudi.com.codahale.metrics.ScheduledReporter; +import org.apache.hudi.com.codahale.metrics.Timer; +import org.apache.inlong.audit.AuditOperator; +import org.apache.inlong.sort.base.Constants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Optional; +import java.util.Set; +import java.util.SortedMap; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.GROUP_ID; +import static org.apache.inlong.sort.base.Constants.STREAM_ID; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.ACTION_TYPES; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.METRIC_TOTAL_BYTES_WRITTEN; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsConst.METRIC_TOTAL_RECORDS_WRITTEN; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsUtil.getMetricsName; + +/** + * The schedule reporter for submit rowCount and rowSize of writing hudi + */ +public class HudiAuditReporter extends ScheduledReporter { + + private static final String IP_OR_HOST_PORT = "^(.*):([0-9]|[1-9]\\d|[1-9]\\d{" + + "2}|[1-9]\\d{" + + "3}|[1-5]\\d{" + + "4}|6[0-4]\\d{" + + "3}|65[0-4]\\d{" + + "2}|655[0-2]\\d|6553[0-5])$"; + private static final Logger LOG = LoggerFactory.getLogger(HudiAuditReporter.class); + private final LinkedHashMap<String, String> labels; + private Set<String> byteMetricNames; + private Set<String> recordMetricNames; + private AuditOperator auditOperator; + + protected HudiAuditReporter( + String inLongLabels, + String inLongAudit, + String metricNamePrefix, + MetricRegistry registry, + String name, + MetricFilter filter, + TimeUnit rateUnit, + TimeUnit durationUnit) { + super(registry, name, filter, rateUnit, durationUnit); + LOG.info("Create HudiAuditReporter, inLongLabels: {}, inLongAudit: {}, reportName: {}.", + inLongLabels, inLongAudit, name); + + labels = new LinkedHashMap<>(); + String[] inLongLabelArray = inLongLabels.split(DELIMITER); + Preconditions.checkArgument(Stream.of(inLongLabelArray).allMatch(label -> label.contains("=")), + "InLong metric label format must be xxx=xxx"); + Stream.of(inLongLabelArray).forEach(label -> { + String key = label.substring(0, label.indexOf('=')); + String value = label.substring(label.indexOf('=') + 1); + labels.put(key, value); + }); + String ipPorts = inLongAudit; + HashSet<String> ipPortList = new HashSet<>(); + + if (ipPorts != null) { + Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID), + "groupId and streamId must be set when enable inlong audit collect."); + String[] ipPortStrs = inLongAudit.split(DELIMITER); + for (String ipPort : ipPortStrs) { + Preconditions.checkArgument(Pattern.matches(IP_OR_HOST_PORT, ipPort), + "Error inLong audit format: " + inLongAudit); + ipPortList.add(ipPort); + } + } + + if (!ipPortList.isEmpty()) { + AuditOperator.getInstance().setAuditProxy(ipPortList); + auditOperator = AuditOperator.getInstance(); + } + + recordMetricNames = Arrays.stream(ACTION_TYPES) + .map(action -> getMetricsName(metricNamePrefix, action, METRIC_TOTAL_RECORDS_WRITTEN)) + .collect(Collectors.toSet()); + byteMetricNames = Arrays.stream(ACTION_TYPES) + .map(action -> getMetricsName(metricNamePrefix, action, METRIC_TOTAL_BYTES_WRITTEN)) + .collect(Collectors.toSet()); + } + + @Override + public void report( + SortedMap<String, Gauge> gaugeMap, + SortedMap<String, Counter> countMap, + SortedMap<String, Histogram> histogramMap, + SortedMap<String, Meter> meterMap, + SortedMap<String, Timer> timerMap) { + + if (auditOperator != null && !gaugeMap.isEmpty()) { + long rowCount = getGaugeValue(gaugeMap, recordMetricNames); + long rowSize = getGaugeValue(gaugeMap, byteMetricNames); + auditOperator.add( + Constants.AUDIT_SORT_OUTPUT, + getGroupId(), + getStreamId(), + System.currentTimeMillis(), + rowCount, + rowSize); + } + } + + private Long getGaugeValue( + SortedMap<String, Gauge> gaugeMap, + Set<String> metricNames) { + return metricNames + .stream() + .mapToLong(metricName -> getGaugeValue(gaugeMap, metricName)) + .sum(); + } + + private Long getGaugeValue( + SortedMap<String, Gauge> gaugeMap, + String metricName) { + return Optional.ofNullable(gaugeMap.get(metricName)) + .map(Gauge::getValue) + .map(v -> (Long) v) + .orElse(0L); + } + + public LinkedHashMap<String, String> getLabels() { + return labels; + } + + private String getStreamId() { + return getLabels().get(STREAM_ID); + } + + private String getGroupId() { + return getLabels().get(GROUP_ID); + } + + public boolean isReady() { + return auditOperator != null; + } +} diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java new file mode 100644 index 000000000..d5a3141e8 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfig.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.hudi.common.config.ConfigClassProperty; +import org.apache.hudi.common.config.ConfigGroups; +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.Immutable; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Properties; + +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRIC_PREFIX; + +/** + * Configs for InLongHudi reporter type. + */ +@Immutable +@ConfigClassProperty(name = "Metrics Configurations for InLongHudi reporter", groupName = ConfigGroups.Names.METRICS, description = "Enables reporting on Hudi metrics using the InLongHudi reporter type. " + + "Hudi publishes metrics on every commit, clean, rollback etc.") +public class HudiMetricsConfig extends HoodieConfig { + + public static final Logger LOG = LoggerFactory.getLogger(HudiMetricsConfig.class); + public static final String METRIC_TYPE = "inlonghudi"; + + public static final String INLONG_HUDI_PREFIX = METRIC_PREFIX + "." + METRIC_TYPE; + + public static final ConfigProperty<Integer> REPORT_PERIOD_IN_SECONDS = ConfigProperty + .key(INLONG_HUDI_PREFIX + ".report.period.seconds") + .defaultValue(30) + .sinceVersion("0.6.0") + .withDocumentation("InLongHudi reporting period in seconds. Default to 30."); + + /** + * Get config from props + */ + public static <T> T getConfig( + Properties props, + ConfigProperty<T> configProperty) { + return new GeneralConfig<T>(configProperty).getProperty(props); + } + + /** + * Get config from props + */ + public static <T> T getConfig( + Properties props, + ConfigOption<T> configProperty) { + return new GeneralConfig<T>(configProperty).getProperty(props); + } + + /** + * The until for parse config. + */ + public static class GeneralConfig<T> { + + private ConfigOption<T> option; + private ConfigProperty<T> config; + private Type type; + + public GeneralConfig(ConfigOption<T> configOption) { + this.option = configOption; + } + + public GeneralConfig(ConfigProperty<T> configProperty) { + this.config = configProperty; + } + + public String key() { + return option != null ? option.key() : config.key(); + } + + public T defaultValue() { + return option != null ? option.defaultValue() : config.defaultValue(); + } + + public T getProperty(Properties prop) { + String key = key(); + T defaultValue = defaultValue(); + Object o = prop.get(key); + if (o == null) { + return defaultValue; + } + Class<?> defaultValueClass = defaultValue.getClass(); + if (o.getClass().isAssignableFrom(defaultValueClass)) { + return (T) o; + } else { + String property = String.valueOf(o); + try { + return (T) parseValue(defaultValueClass, property); + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Can not properly parse value:'" + property + + "' to '" + defaultValueClass + "'", e); + } + } + } + + private static <T> T parseValue( + Class<T> clazz, + String property) throws InvocationTargetException, IllegalAccessException { + Method valueOfMethod = null; + try { + valueOfMethod = clazz.getMethod("valueOf", String.class); + } catch (NoSuchMethodException e) { + LOG.error("Can not properly find 'valueOf' method of " + clazz, e); + throw new UnsupportedOperationException(); + } + return (T) valueOfMethod.invoke(null, property); + } + } + +} diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java new file mode 100644 index 000000000..bedc65978 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConst.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +/** + * The constant values of hudi metric. + */ +public class HudiMetricsConst { + + public static final String METRIC_TOTAL_RECORDS_WRITTEN = "totalRecordsWritten"; + + public static final String METRIC_TOTAL_BYTES_WRITTEN = "totalBytesWritten"; + + public static final String[] ACTION_TYPES = {"insert", "upsert"}; + +} diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java new file mode 100644 index 000000000..43969a355 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/HudiMetricsUtil.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.metrics.HoodieMetrics; + +public class HudiMetricsUtil { + + /** + * Get metric name of hudi inner metrics. + * {@link HoodieMetrics#updateCommitMetrics(long, long, HoodieCommitMetadata, String)} + */ + public static String getMetricsName(String metricsNamePrefix, String action, String metric) { + return String.format("%s.%s.%s", metricsNamePrefix, action, metric); + } +} diff --git a/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java new file mode 100644 index 000000000..1ff8655a2 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/metric/InLongHudiAuditReporter.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_PREFIX; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsConfig.REPORT_PERIOD_IN_SECONDS; +import static org.apache.inlong.sort.hudi.metric.HudiMetricsConfig.getConfig; + +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.util.StringUtils; +import org.apache.hudi.com.codahale.metrics.MetricFilter; +import org.apache.hudi.com.codahale.metrics.MetricRegistry; +import org.apache.hudi.metrics.custom.CustomizableMetricsReporter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The main entry of hudi audit reporter. + */ +public class InLongHudiAuditReporter extends CustomizableMetricsReporter { + + private static final Logger LOG = LoggerFactory.getLogger(InLongHudiAuditReporter.class); + + private HudiAuditReporter hudiAuditReporter; + private Integer reportPeriodSeconds; + + public InLongHudiAuditReporter(Properties props, MetricRegistry registry) { + super(props, registry); + + String inLongLabels = getConfig(props, INLONG_METRIC); + String inLongAudit = getConfig(props, INLONG_AUDIT); + String metricNamePrefix = getConfig(props, METRICS_REPORTER_PREFIX); + this.reportPeriodSeconds = getConfig(props, REPORT_PERIOD_IN_SECONDS); + + if (StringUtils.isNullOrWhitespaceOnly(inLongLabels)) { + LOG.error("Fatal error on create InLongHudiReporter, inLongLabels is empty!"); + return; + } + if (StringUtils.isNullOrWhitespaceOnly(inLongAudit)) { + LOG.error("Fatal error on create InLongHudiReporter, inLongAudit is empty!"); + return; + } + + this.hudiAuditReporter = new HudiAuditReporter( + inLongLabels, + inLongAudit, + metricNamePrefix, + registry, + "inlong-hudi-audit-reporter", + MetricFilter.ALL, + TimeUnit.SECONDS, + TimeUnit.SECONDS); + } + + @Override + public void start() { + if (hudiAuditReporter != null && hudiAuditReporter.isReady()) { + this.hudiAuditReporter.start(reportPeriodSeconds, TimeUnit.SECONDS); + } + } + + @Override + public void report() { + + } + + @Override + public void stop() { + + } +} diff --git a/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java b/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java new file mode 100644 index 000000000..588fa4d13 --- /dev/null +++ b/inlong-sort/sort-connectors/hudi/src/test/java/org/apache/inlong/sort/hudi/metric/HudiMetricsConfigTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.metric; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.hudi.common.config.ConfigProperty; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Properties; + +/** + * The UnitTest for {@link HudiMetricsConfig#getConfig(Properties, ConfigOption)} + * + */ +public class HudiMetricsConfigTest { + + @Test + public void testGetConfigByConfigProperty() { + Properties props = new Properties(); + + String stringPropKey = "string_prop"; + String stringPropDefaultValue = "default_string_value"; + String stringPropExpectedValue = "sample_value"; + + ConfigProperty<String> stringProp = + ConfigProperty.key(stringPropKey) + .defaultValue(stringPropDefaultValue); + + testGetValueByConfig( + props, + stringProp, + stringPropKey, + stringPropDefaultValue, + stringPropExpectedValue); + + String intPropKey = "int_prop"; + int intPropDefaultValue = 1; + int intPropExpectedValue = 3; + ConfigProperty<Integer> intProp = + ConfigProperty.key(intPropKey) + .defaultValue(intPropDefaultValue); + testGetValueByConfig(props, intProp, intPropKey, intPropDefaultValue, intPropExpectedValue); + + String longPropKey = "long_prop"; + long longPropDefaultValue = 1L; + long longPropExpectedValue = 1000L; + ConfigProperty<Long> longProp = + ConfigProperty.key(longPropKey) + .defaultValue(longPropDefaultValue); + testGetValueByConfig(props, longProp, longPropKey, longPropDefaultValue, longPropExpectedValue); + + String floatPropKey = "float_prop"; + float floatPropDefaultValue = 3.14f; + ConfigProperty<Float> floatProp = + ConfigProperty.key(floatPropKey) + .defaultValue(floatPropDefaultValue); + float floatExpectedValue = 1000.0f; + testGetValueByConfig(props, floatProp, floatPropKey, floatPropDefaultValue, floatExpectedValue); + + String doublePropKey = "double_prop"; + double doublePropDefaultValue = 3.14159265358979; + double doublePropExpectedValue = 1000.1000; + ConfigProperty<Double> doubleProp = + ConfigProperty.key(doublePropKey) + .defaultValue(doublePropDefaultValue); + testGetValueByConfig(props, doubleProp, doublePropKey, doublePropDefaultValue, doublePropExpectedValue); + } + + private <T> void testGetValueByConfig( + Properties props, + ConfigProperty<T> property, + String key, + T defaultValue, + T expectedValue) { + T value = HudiMetricsConfig.getConfig(props, property); + Assert.assertEquals(value, defaultValue); + + props.put(key, expectedValue); + value = HudiMetricsConfig.getConfig(props, property); + Assert.assertEquals(expectedValue, value); + } + + private <T> void testGetValueByConfig( + Properties props, + ConfigOption<T> option, + String key, + T defaultValue, + T expectedValue) { + T value = HudiMetricsConfig.getConfig(props, option); + Assert.assertEquals(value, defaultValue); + + props.put(key, expectedValue); + value = HudiMetricsConfig.getConfig(props, option); + Assert.assertEquals(expectedValue, value); + } + + @Test + public void testGetConfigByConfigOption() { + Properties props = new Properties(); + + String stringPropKey = "string_prop"; + String stringPropDefaultValue = "default_string_value"; + String stringPropExpectedValue = "sample_value"; + + ConfigOption<String> stringProp = + ConfigOptions.key(stringPropKey) + .defaultValue(stringPropDefaultValue); + + testGetValueByConfig( + props, + stringProp, + stringPropKey, + stringPropDefaultValue, + stringPropExpectedValue); + + String intPropKey = "int_prop"; + int intPropDefaultValue = 1; + int intPropExpectedValue = 3; + ConfigOption<Integer> intProp = + ConfigOptions.key(intPropKey) + .defaultValue(intPropDefaultValue); + testGetValueByConfig(props, intProp, intPropKey, intPropDefaultValue, intPropExpectedValue); + + String longPropKey = "long_prop"; + long longPropDefaultValue = 1L; + long longPropExpectedValue = 1000L; + ConfigOption<Long> longProp = + ConfigOptions.key(longPropKey) + .defaultValue(longPropDefaultValue); + testGetValueByConfig(props, longProp, longPropKey, longPropDefaultValue, longPropExpectedValue); + + String floatPropKey = "float_prop"; + float floatPropDefaultValue = 3.14f; + ConfigOption<Float> floatProp = + ConfigOptions.key(floatPropKey) + .defaultValue(floatPropDefaultValue); + float floatExpectedValue = 1000.0f; + testGetValueByConfig(props, floatProp, floatPropKey, floatPropDefaultValue, floatExpectedValue); + + String doublePropKey = "double_prop"; + double doublePropDefaultValue = 3.14159265358979; + double doublePropExpectedValue = 1000.1000; + ConfigOption<Double> doubleProp = + ConfigOptions.key(doublePropKey) + .defaultValue(doublePropDefaultValue); + testGetValueByConfig(props, doubleProp, doublePropKey, doublePropDefaultValue, doublePropExpectedValue); + } +}