This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 77cd4762f7 [#9984] feat(optimizer): add monitor implementation
providers and evaluator (#10104)
77cd4762f7 is described below
commit 77cd4762f7c03e55ec134f65c780eb04ebb3b9dd
Author: FANNG <[email protected]>
AuthorDate: Mon Mar 2 12:47:13 2026 +0900
[#9984] feat(optimizer): add monitor implementation providers and evaluator
(#10104)
## What changed are added
### 1. Added monitor implementations
- Added GravitinoMetricsEvaluator
- Rule-based evaluation with aggregations: max, min, avg, latest
- Scoped rules: table.<metric> and job.<metric>
- Partition scope is evaluated using table-scoped rules
- Added GravitinoMetricsProvider
- Added local file-based table-job relation provider:
- LocalTableJobRelationProvider
- LocalTableJobRelationReader for JSON-lines table -> jobs mapping
### 2. Service discovery wiring
- Updated META-INF/services registrations for monitor provider/evaluator
loading (main and test resources)
### 3. Behavior and robustness improvements
- Extracted evaluator rule parsing into dedicated initialization logic
- Better validation and error messages for local relation file
configuration/
path
- Added explicit initialization guards for provider usage
- Improved read failure diagnostics to include file path and target
table
identifier
- Logging polish:
- Evaluator per-metric skip logs lowered to DEBUG
- Keep evaluator summary at INFO
- Removed duplicated utility-level parse WARN logs from IdentifierUtils
(callers log context)
### 4. Naming/semantic cleanup
- Renamed monitor job-provider abstractions to table-job relation
terminology:
- JobProvider -> TableJobRelationProvider
- LocalJobProvider -> LocalTableJobRelationProvider
- FileJobReader -> LocalTableJobRelationReader
- DummyJobProvider -> DummyTableJobRelationProvider
- Updated related method/field names
(createTableJobRelationProviderInstance,
etc.)
## Why this change is needed
- Makes monitor module production-usable instead of interface-only
- Provides configurable, scope-aware, rule-based evaluation
- Improves diagnostics and runtime safety for local file-based relation
loading
- Reduces log noise and duplicate warnings
- Clarifies semantics for future extensibility of table-job relation
sources
Fixes: #9984
## User-facing changes
Yes.
### New/updated provider/evaluator names
- gravitino-metrics-evaluator
- gravitino-metrics-provider
- local-table-job-relation-provider (renamed)
### Rule config
- optimizer.monitor.gravitino-metrics-evaluator.rules
### Property key update (renamed)
- from: optimizer.monitor.local-job-provider.file-path (or old
file-provider
variant)
- to: optimizer.monitor.local-table-job-relation-provider.filePath
### Rule semantics
- Aggregations: max, min, avg, latest
- Scopes: table, job
- Partition metrics use table rules
## Testing
Executed Spotless and targeted optimizer tests for evaluator/provider/
relation-provider/monitor flows.
./gradlew :maintenance:optimizer:spotlessApply
IT_SPARK_HOME=/Users/fanng/deploy/demo/spark-3.5.3-bin-hadoop3 \
IT_SPARK_ARGS='--conf
spark.jars=/Users/fanng/deploy/demo/jars/iceberg-spark-
runtime-3.5_2.12-1.9.2.jar,/Users/fanng/deploy/demo/jars/iceberg-aws-bundle-
1.9.2.jar --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionEx
tensions --conf
spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog
--conf spark.sql.catalog.rest.type=rest --conf
spark.sql.catalog.rest.uri=http://127.0.0.1:9001/iceberg/ --conf
spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation=vended-credentials'
\
IT_TABLE_IDENTIFIER=rest.ab.a1 \
GRAVITINO_ENV_IT=true \
./gradlew :maintenance:optimizer:test \
--tests "org.apache.gravitino.maintenance.optimizer.monitor.TestMonitor"
\
--tests
"org.apache.gravitino.maintenance.optimizer.monitor.evaluator.TestGravitinoMet
ricsEvaluator" \
--tests
"org.apache.gravitino.maintenance.optimizer.monitor.metrics.TestGravitinoMetri
csProvider" \
--tests
"org.apache.gravitino.maintenance.optimizer.monitor.job.TestLocalTableJobRelat
ionProvider" \
-PskipIT=true
---
.../optimizer/api/monitor/MetricsEvaluator.java | 9 +
...Provider.java => TableJobRelationProvider.java} | 2 +-
.../optimizer/common/conf/OptimizerConfig.java | 22 +-
.../optimizer/common/util/IdentifierUtils.java | 63 ++-
.../optimizer/common/util/ProviderUtils.java | 6 +-
.../maintenance/optimizer/monitor/Monitor.java | 25 +-
.../monitor/callback/ConsoleMonitorCallback.java | 92 ++++
.../evaluator/GravitinoMetricsEvaluator.java | 500 +++++++++++++++++++++
.../job/dummy/DummyTableJobRelationProvider.java} | 35 +-
.../job/local/LocalTableJobRelationProvider.java | 108 +++++
.../job/local/LocalTableJobRelationReader.java | 140 ++++++
.../monitor/metrics/GravitinoMetricsProvider.java | 149 ++++++
.../local/AbstractStatisticsImporter.java | 76 ++--
...itino.maintenance.optimizer.api.common.Provider | 4 +
...tenance.optimizer.api.monitor.MetricsEvaluator} | 6 +-
.../optimizer/common/util/TestIdentifierUtils.java | 63 +++
.../optimizer/common/util/TestProviderUtils.java | 21 +-
.../maintenance/optimizer/monitor/TestMonitor.java | 50 ++-
.../monitor/evaluator/MetricsEvaluatorForTest.java | 4 +-
.../evaluator/TestGravitinoMetricsEvaluator.java | 460 +++++++++++++++++++
...t.java => TableJobRelationProviderForTest.java} | 6 +-
.../job/TestLocalTableJobRelationProvider.java | 102 +++++
.../monitor/metrics/MetricsProviderForTest.java | 6 +-
.../metrics/TestGravitinoMetricsProvider.java | 122 +++++
...itino.maintenance.optimizer.api.common.Provider | 2 +-
25 files changed, 1968 insertions(+), 105 deletions(-)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
index f383ef6169..166222da7a 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
@@ -23,15 +23,24 @@ import java.util.List;
import java.util.Map;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
/**
* Evaluator interface for the table and related job metrics before and after
optimization actions.
*/
@DeveloperApi
public interface MetricsEvaluator {
+
/** Human-readable evaluator name, primarily for logging and selection. */
String name();
+ /**
+ * Optional initialization hook for evaluators that need runtime
configuration.
+ *
+ * @param optimizerEnv shared optimizer environment/configuration
+ */
+ default void initialize(OptimizerEnv optimizerEnv) {}
+
/**
* Evaluate metrics before/after optimization to decide success/failure.
*
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
similarity index 95%
rename from
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
rename to
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
index 8b4e825582..3fc60d2bfd 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
@@ -26,7 +26,7 @@ import
org.apache.gravitino.maintenance.optimizer.api.common.Provider;
/** Represents a provider that provides upstream and downstream jobs for a
table. */
@DeveloperApi
-public interface JobProvider extends Provider {
+public interface TableJobRelationProvider extends Provider {
/**
* List jobs related to the provided table.
*
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index ee22f46ead..5d64378ff2 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -26,6 +26,9 @@ import org.apache.gravitino.Config;
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
+import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider;
+import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
import
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
@@ -58,9 +61,10 @@ public class OptimizerConfig extends Config {
public static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
private static final String STATISTICS_UPDATER = UPDATER_PREFIX +
"statisticsUpdater";
private static final String METRICS_UPDATER = UPDATER_PREFIX +
"metricsUpdater";
- private static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
+ public static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
private static final String METRICS_PROVIDER = MONITOR_PREFIX +
"metricsProvider";
- private static final String JOB_PROVIDER = MONITOR_PREFIX + "jobProvider";
+ private static final String TABLE_JOB_RELATION_PROVIDER =
+ MONITOR_PREFIX + "tableJobRelationProvider";
private static final String METRICS_EVALUATOR = MONITOR_PREFIX +
"metricsEvaluator";
private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks";
@@ -129,16 +133,16 @@ public class OptimizerConfig extends Config {
+ "discoverable via ServiceLoader. Example:
'metrics-provider'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(GravitinoMetricsProvider.NAME);
- public static final ConfigEntry<String> JOB_PROVIDER_CONFIG =
- new ConfigBuilder(JOB_PROVIDER)
+ public static final ConfigEntry<String> TABLE_JOB_RELATION_PROVIDER_CONFIG =
+ new ConfigBuilder(TABLE_JOB_RELATION_PROVIDER)
.doc(
- "Monitor job provider implementation name (matches
Provider.name()) discoverable "
- + "via ServiceLoader. Example: 'job-provider'.")
+ "Monitor table-job relation provider implementation name
(matches Provider.name()) "
+ + "discoverable via ServiceLoader. Example:
'table-job-relation-provider'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(DummyTableJobRelationProvider.NAME);
public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG =
new ConfigBuilder(METRICS_EVALUATOR)
@@ -148,7 +152,7 @@ public class OptimizerConfig extends Config {
+ "'metrics-evaluator'.")
.version(ConfigConstants.VERSION_1_2_0)
.stringConf()
- .create();
+ .createWithDefault(GravitinoMetricsEvaluator.NAME);
public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG =
new ConfigBuilder(MONITOR_CALLBACKS)
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
index 415e3b5634..761db7c891 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
@@ -20,12 +20,13 @@
package org.apache.gravitino.maintenance.optimizer.common.util;
import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
/** Utilities for working with fully qualified table identifiers. */
public class IdentifierUtils {
-
private static final String NORMALIZED_IDENTIFIER_MESSAGE =
"Identifier must be catalog.schema.table";
@@ -71,4 +72,64 @@ public class IdentifierUtils {
Preconditions.checkArgument(
namespace != null && namespace.levels().length == 2,
NORMALIZED_IDENTIFIER_MESSAGE);
}
+
+ /**
+ * Parse table identifier text.
+ *
+ * <p>Accepted forms:
+ *
+ * <ul>
+ * <li>{@code schema.table}, when default catalog is configured
+ * <li>{@code catalog.schema.table}
+ * </ul>
+ *
+ * @param identifierText raw identifier text
+ * @param defaultCatalogName optional default catalog for {@code
schema.table}
+ * @return parsed identifier
+ */
+ public static Optional<NameIdentifier> parseTableIdentifier(
+ String identifierText, String defaultCatalogName) {
+ if (StringUtils.isBlank(identifierText)) {
+ return Optional.empty();
+ }
+
+ try {
+ NameIdentifier parsed = NameIdentifier.parse(identifierText);
+ int levels = parsed.namespace().levels().length;
+ if (levels == 0) {
+ return Optional.empty();
+ }
+ if (levels == 1) {
+ if (StringUtils.isNotBlank(defaultCatalogName)) {
+ return Optional.of(
+ NameIdentifier.of(defaultCatalogName,
parsed.namespace().levels()[0], parsed.name()));
+ }
+ return Optional.empty();
+ }
+ if (levels == 2) {
+ return Optional.of(parsed);
+ }
+ return Optional.empty();
+ } catch (Exception e) {
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Parse job identifier text.
+ *
+ * @param identifierText raw identifier text
+ * @return parsed identifier
+ */
+ public static Optional<NameIdentifier> parseJobIdentifier(String
identifierText) {
+ if (StringUtils.isBlank(identifierText)) {
+ return Optional.empty();
+ }
+
+ try {
+ return Optional.of(NameIdentifier.parse(identifierText));
+ } catch (Exception e) {
+ return Optional.empty();
+ }
+ }
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index bd63e389b0..71b7d36e04 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -25,9 +25,9 @@ import java.util.List;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
@@ -107,8 +107,8 @@ public class ProviderUtils {
return createProviderInstance(MetricsProvider.class, provider);
}
- public static JobProvider createJobProviderInstance(String provider) {
- return createProviderInstance(JobProvider.class, provider);
+ public static TableJobRelationProvider
createTableJobRelationProviderInstance(String provider) {
+ return createProviderInstance(TableJobRelationProvider.class, provider);
}
public static MonitorCallback createMonitorCallbackInstance(String provider)
{
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
index ebf88358f0..5f01615bb2 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
@@ -31,11 +31,11 @@ import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
@@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory;
*
* <ul>
* <li>{@link OptimizerConfig#METRICS_PROVIDER_CONFIG} for {@link
MetricsProvider}.
- * <li>{@link OptimizerConfig#JOB_PROVIDER_CONFIG} for {@link JobProvider}.
+ * <li>{@link OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} for {@link
+ * TableJobRelationProvider}.
* <li>{@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} for {@link
MetricsEvaluator}.
* <li>{@link OptimizerConfig#MONITOR_CALLBACKS_CONFIG} for callback list.
* </ul>
@@ -82,7 +83,7 @@ import org.slf4j.LoggerFactory;
* <li>Resolve a time range: {@code [actionTimeSeconds - rangeSeconds,
actionTimeSeconds +
* rangeSeconds]}.
* <li>Read table/partition metrics and evaluate them.
- * <li>Resolve related jobs from {@link JobProvider} and evaluate each job's
metrics.
+ * <li>Resolve related jobs from {@link TableJobRelationProvider} and
evaluate each job's metrics.
* <li>Return ordered results (table first, then jobs).
* </ol>
*/
@@ -90,7 +91,7 @@ public class Monitor implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(Monitor.class);
private final MetricsProvider metricsProvider;
- private final JobProvider jobProvider;
+ private final TableJobRelationProvider tableJobRelationProvider;
private final MetricsEvaluator metricsEvaluator;
private final List<MonitorCallback> callbacks;
private final CloseableGroup closeableGroup = new CloseableGroup();
@@ -107,11 +108,13 @@ public class Monitor implements AutoCloseable {
metricsProvider.initialize(optimizerEnv);
closeableGroup.register(metricsProvider,
MetricsProvider.class.getSimpleName());
- this.jobProvider = loadJobProvider(optimizerEnv.config());
- jobProvider.initialize(optimizerEnv);
- closeableGroup.register(jobProvider, JobProvider.class.getSimpleName());
+ this.tableJobRelationProvider =
loadTableJobRelationProvider(optimizerEnv.config());
+ tableJobRelationProvider.initialize(optimizerEnv);
+ closeableGroup.register(
+ tableJobRelationProvider,
TableJobRelationProvider.class.getSimpleName());
this.metricsEvaluator = loadMetricsEvaluator(optimizerEnv.config());
+ metricsEvaluator.initialize(optimizerEnv);
this.callbacks = loadCallbacks(optimizerEnv.config());
for (MonitorCallback callback : callbacks) {
callback.initialize(optimizerEnv);
@@ -143,7 +146,7 @@ public class Monitor implements AutoCloseable {
results.add(
evaluateTableMetrics(
metricsEvaluator, tableIdentifier, actionTimeSeconds,
rangeSeconds, partitionPath));
- List<NameIdentifier> jobs = jobProvider.jobIdentifiers(tableIdentifier);
+ List<NameIdentifier> jobs =
tableJobRelationProvider.jobIdentifiers(tableIdentifier);
if (jobs == null) {
jobs = List.of();
}
@@ -269,9 +272,9 @@ public class Monitor implements AutoCloseable {
optimizerConfig.get(OptimizerConfig.METRICS_PROVIDER_CONFIG));
}
- private JobProvider loadJobProvider(OptimizerConfig optimizerConfig) {
- return ProviderUtils.createJobProviderInstance(
- optimizerConfig.get(OptimizerConfig.JOB_PROVIDER_CONFIG));
+ private TableJobRelationProvider
loadTableJobRelationProvider(OptimizerConfig optimizerConfig) {
+ return ProviderUtils.createTableJobRelationProviderInstance(
+
optimizerConfig.get(OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG));
}
private MetricsEvaluator loadMetricsEvaluator(OptimizerConfig
optimizerConfig) {
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
new file mode 100644
index 0000000000..2bb2af4acd
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.callback;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+/** Built-in callback that prints evaluation results and metric summaries to
console. */
+public class ConsoleMonitorCallback implements MonitorCallback {
+
+ public static final String NAME = "console";
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {}
+
+ @Override
+ public void onEvaluation(EvaluationResult result) {
+ System.out.println(
+ String.format(
+ "MONITOR: time=%s scope=%s identifier=%s%s evaluation=%s
evaluator=%s",
+ Instant.now(),
+ result.scope().type(),
+ result.scope().identifier(),
+ result.scope().partition().isPresent()
+ ? " partition=" + result.scope().partition().get()
+ : "",
+ result.evaluation(),
+ result.evaluatorName()));
+ if (!result.beforeMetrics().isEmpty()) {
+ System.out.println("METRICS BEFORE: " +
formatMetrics(result.beforeMetrics()));
+ }
+ if (!result.afterMetrics().isEmpty()) {
+ System.out.println("METRICS AFTER: " +
formatMetrics(result.afterMetrics()));
+ }
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ private String formatMetrics(Map<String, List<MetricSample>> metrics) {
+ if (metrics == null || metrics.isEmpty()) {
+ return "{}";
+ }
+ return metrics.entrySet().stream()
+ .map(entry -> entry.getKey() + "=" +
formatMetricSamples(entry.getValue()))
+ .collect(Collectors.joining(", ", "{", "}"));
+ }
+
+ private String formatMetricSamples(List<MetricSample> samples) {
+ if (samples == null || samples.isEmpty()) {
+ return "[]";
+ }
+ return samples.stream()
+ .map(sample -> sample.timestamp() + ":" + formatMetricValue(sample))
+ .collect(Collectors.joining(", ", "[", "]"));
+ }
+
+ private String formatMetricValue(MetricSample sample) {
+ if (sample == null || sample.statistic() == null ||
sample.statistic().value() == null) {
+ return "N/A";
+ }
+ return String.valueOf(sample.statistic().value().value());
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
new file mode 100644
index 0000000000..6c5798108f
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Built-in {@link MetricsEvaluator} for comparing before/after metrics with
configurable rules.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ * <li>Set {@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} to {@value
#NAME}.
+ * <li>Configure {@link #EVALUATION_RULES_CONFIG} with comma-separated rule
entries.
+ * </ul>
+ *
+ * <p>Rule format:
+ *
+ * <ul>
+ * <li>{@code scope:metricName:aggregation:comparison}
+ * <li>{@code scope}: {@code table|job}
+ * <li>{@code aggregation}: {@code max|min|avg|latest}
+ * <li>{@code comparison}: {@code lt|le|gt|ge|eq|ne}
+ * </ul>
+ *
+ * <p>Evaluation semantics:
+ *
+ * <ul>
+ * <li>Each rule selects one metric and computes aggregated {@code before}
and {@code after}
+ * values.
+ * <li>The configured comparison is applied as {@code compare(after,
before)}.
+ * <li>Partition scopes reuse table rules.
+ * <li>Rules with insufficient/non-aggregatable samples are skipped.
+ * <li>The overall result is pass when no rule fails.
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * gravitino.optimizer.monitor.metricsEvaluator = gravitino-metrics-evaluator
+ * gravitino.optimizer.monitor.gravitinoMetricsEvaluator.rules =
+ * table:row_count:avg:le,job:duration:latest:le
+ * }</pre>
+ */
+public class GravitinoMetricsEvaluator implements MetricsEvaluator {
+
+ public static final String NAME = "gravitino-metrics-evaluator";
+ public static final String CONFIG_NAME = "gravitinoMetricsEvaluator";
+
+ /**
+ * Comma-separated evaluation rules in format: {@code
scope:metricName:aggregation:comparison}.
+ * Supported scope values are {@code table|job}. Supported aggregation
values are {@code
+ * max|min|avg|latest}. Supported comparison values are {@code
lt|le|gt|ge|eq|ne}. Table rules are
+ * applied to both table and partition metric scopes.
+ */
+ public static final String EVALUATION_RULES_CONFIG =
+ OptimizerConfig.MONITOR_PREFIX + CONFIG_NAME + ".rules";
+
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoMetricsEvaluator.class);
+
+ private Map<RuleScope, Map<String, RuleConfig>> metricRulesByScope =
Map.of();
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ String rawRules =
optimizerEnv.config().getRawString(EVALUATION_RULES_CONFIG);
+ this.metricRulesByScope = parseEvaluationRules(rawRules);
+ if (metricRulesByScope.isEmpty()) {
+ LOG.warn(
+ "No evaluator rules configured in {}. All evaluations will pass
until rules are set.",
+ EVALUATION_RULES_CONFIG);
+ return;
+ }
+ LOG.info(
+ "Loaded {} scoped metric evaluation rules for {}: {}",
+ ruleCount(metricRulesByScope),
+ NAME,
+ formatRules(metricRulesByScope));
+ }
+
+ /**
+ * Parse evaluator rules from raw config text.
+ *
+ * <p>Rule format: {@code scope:metricName:aggregation:comparison},
separated by commas.
+ *
+ * @param rawRules raw rule text from config
+ * @return immutable scoped rule map, or empty map when input is blank
+ */
+ static Map<RuleScope, Map<String, RuleConfig>> parseEvaluationRules(String
rawRules) {
+ if (StringUtils.isBlank(rawRules)) {
+ return Map.of();
+ }
+
+ Map<RuleScope, Map<String, RuleConfig>> parsedRules = new
LinkedHashMap<>();
+ String[] entries = rawRules.split(",");
+ for (String entry : entries) {
+ String trimmed = entry == null ? "" : entry.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+
+ String[] parts = trimmed.split(":", -1);
+ if (parts.length != 4) {
+ throw new IllegalArgumentException(
+ "Invalid evaluator rule '"
+ + trimmed
+ + "'. Expected format is
scope:metricName:aggregation:comparison");
+ }
+
+ String scopeText = parts[0].trim();
+ String metricText = parts[1].trim();
+ String aggregationText = parts[2].trim();
+ String comparisonText = parts[3].trim();
+
+ RuleScope scope = RuleScope.fromConfigValue(scopeText);
+ String metricName = normalizeMetricName(metricText);
+ if (metricName.isEmpty()) {
+ throw new IllegalArgumentException("Metric name must not be blank in
rule: " + trimmed);
+ }
+ AggregationOp aggregation = AggregationOp.from(aggregationText);
+ ComparisonOp comparison = ComparisonOp.from(comparisonText);
+
+ Map<String, RuleConfig> scopeRules =
+ parsedRules.computeIfAbsent(scope, ignored -> new LinkedHashMap<>());
+ RuleConfig previous =
+ scopeRules.putIfAbsent(metricName, new RuleConfig(aggregation,
comparison));
+ if (previous != null) {
+ throw new IllegalArgumentException(
+ "Duplicate metric rule for '"
+ + scope.configValue
+ + ":"
+ + metricName
+ + "' in "
+ + EVALUATION_RULES_CONFIG);
+ }
+ }
+
+ if (parsedRules.isEmpty()) {
+ return Map.of();
+ }
+
+ Map<RuleScope, Map<String, RuleConfig>> immutableRules = new
LinkedHashMap<>();
+ for (Map.Entry<RuleScope, Map<String, RuleConfig>> entry :
parsedRules.entrySet()) {
+ immutableRules.put(entry.getKey(),
Collections.unmodifiableMap(entry.getValue()));
+ }
+ return Collections.unmodifiableMap(immutableRules);
+ }
+
+ @Override
+ public boolean evaluateMetrics(
+ MetricScope scope,
+ Map<String, List<MetricSample>> beforeMetrics,
+ Map<String, List<MetricSample>> afterMetrics) {
+ if (metricRulesByScope.isEmpty()) {
+ return true;
+ }
+
+ RuleScope ruleScope = RuleScope.fromMetricScopeType(scope.type());
+ Map<String, RuleConfig> rules = metricRulesByScope.getOrDefault(ruleScope,
Map.of());
+ if (rules.isEmpty()) {
+ LOG.info(
+ "No evaluator rules configured for scope={}, identifier={}, skip
evaluation",
+ scope.type(),
+ scope.identifier());
+ return true;
+ }
+
+ Map<String, List<MetricSample>> before = beforeMetrics == null ? Map.of()
: beforeMetrics;
+ Map<String, List<MetricSample>> after = afterMetrics == null ? Map.of() :
afterMetrics;
+
+ int evaluatedCount = 0;
+ int failedCount = 0;
+ int skippedCount = 0;
+ for (Map.Entry<String, RuleConfig> rule : rules.entrySet()) {
+ evaluatedCount++;
+ String metricName = rule.getKey();
+ RuleConfig ruleConfig = rule.getValue();
+ AggregationOp aggregation = ruleConfig.aggregation;
+ ComparisonOp comparison = ruleConfig.comparison;
+
+ List<MetricSample> beforeSamples =
sanitizeSamples(findMetricSamples(before, metricName));
+ List<MetricSample> afterSamples =
sanitizeSamples(findMetricSamples(after, metricName));
+ if (beforeSamples.isEmpty() || afterSamples.isEmpty()) {
+ LOG.debug(
+ "Metric {} of {} ({}) has insufficient samples: beforeSize={},
afterSize={}",
+ metricName,
+ scope.identifier(),
+ scope.type(),
+ beforeSamples.size(),
+ afterSamples.size());
+ skippedCount++;
+ continue;
+ }
+
+ OptionalDouble beforeValue = aggregate(beforeSamples, aggregation);
+ OptionalDouble afterValue = aggregate(afterSamples, aggregation);
+ if (beforeValue.isEmpty() || afterValue.isEmpty()) {
+ LOG.debug(
+ "Metric {} of {} ({}) cannot be aggregated by {}",
+ metricName,
+ scope.identifier(),
+ scope.type(),
+ aggregation.configValue);
+ skippedCount++;
+ continue;
+ }
+
+ double beforeNumber = beforeValue.getAsDouble();
+ double afterNumber = afterValue.getAsDouble();
+ boolean passed = comparison.test(afterNumber, beforeNumber);
+ LOG.debug(
+ "Metric {} of {} ({}) using {}:{}: before={}, after={}, passed={}",
+ metricName,
+ scope.identifier(),
+ scope.type(),
+ aggregation.configValue,
+ comparison.configValue,
+ beforeNumber,
+ afterNumber,
+ passed);
+ if (!passed) {
+ failedCount++;
+ }
+ }
+
+ boolean allPassed = failedCount == 0;
+ LOG.info(
+ "Evaluated {} metrics for {} ({}), passed={}, failed={}, skipped={}",
+ evaluatedCount,
+ scope.identifier(),
+ scope.type(),
+ allPassed,
+ failedCount,
+ skippedCount);
+ return allPassed;
+ }
+
+ private static int ruleCount(Map<RuleScope, Map<String, RuleConfig>>
rulesByScope) {
+ return rulesByScope.values().stream().mapToInt(Map::size).sum();
+ }
+
+ private static String formatRules(Map<RuleScope, Map<String, RuleConfig>>
rulesByScope) {
+ return rulesByScope.entrySet().stream()
+ .flatMap(
+ scopeEntry ->
+ scopeEntry.getValue().entrySet().stream()
+ .map(
+ metricEntry ->
+ scopeEntry.getKey().configValue
+ + ":"
+ + metricEntry.getKey()
+ + ":"
+ +
metricEntry.getValue().aggregation.configValue
+ + ":"
+ +
metricEntry.getValue().comparison.configValue))
+ .collect(Collectors.joining(", "));
+ }
+
+ private static List<MetricSample> sanitizeSamples(List<MetricSample>
samples) {
+ return samples == null ? List.of() : samples;
+ }
+
+ private static List<MetricSample> findMetricSamples(
+ Map<String, List<MetricSample>> metrics, String normalizedMetricName) {
+ if (metrics == null || metrics.isEmpty()) {
+ return List.of();
+ }
+ List<MetricSample> exactMatch = metrics.get(normalizedMetricName);
+ if (exactMatch != null) {
+ return exactMatch;
+ }
+ for (Map.Entry<String, List<MetricSample>> entry : metrics.entrySet()) {
+ if (normalizeMetricName(entry.getKey()).equals(normalizedMetricName)) {
+ return entry.getValue();
+ }
+ }
+ return List.of();
+ }
+
+ private static String normalizeMetricName(String metricName) {
+ return metricName == null ? "" :
metricName.trim().toLowerCase(Locale.ROOT);
+ }
+
+ private static OptionalDouble aggregate(List<MetricSample> metrics,
AggregationOp operation) {
+ try {
+ return switch (operation) {
+ case MAX -> metrics.stream()
+ .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+ .max();
+ case MIN -> metrics.stream()
+ .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+ .min();
+ case AVG -> metrics.stream()
+ .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+ .average();
+ case LATEST -> latest(metrics);
+ };
+ } catch (Exception e) {
+ return OptionalDouble.empty();
+ }
+ }
+
+ private static OptionalDouble latest(List<MetricSample> metrics) {
+ Optional<MetricSample> latest =
+
metrics.stream().max(Comparator.comparingLong(MetricSample::timestamp));
+ return latest
+ .map(sample -> OptionalDouble.of(metricNumericValue(sample)))
+ .orElseGet(OptionalDouble::empty);
+ }
+
+ private static double metricNumericValue(MetricSample metric) {
+ Object value = metric.statistic().value().value();
+ if (value instanceof Number) {
+ return ((Number) value).doubleValue();
+ }
+ throw new IllegalArgumentException(
+ "Metric value must be numeric, but got "
+ + (value == null ? "null" : value.getClass().getName()));
+ }
+
+ private enum RuleScope {
+ TABLE("table"),
+ JOB("job");
+
+ private final String configValue;
+
+ RuleScope(String configValue) {
+ this.configValue = configValue;
+ }
+
+ private static RuleScope fromConfigValue(String value) {
+ String normalized = value == null ? "" :
value.trim().toLowerCase(Locale.ROOT);
+ for (RuleScope scope : values()) {
+ if (scope.configValue.equals(normalized)) {
+ return scope;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unsupported rule scope '"
+ + value
+ + "'. Supported scopes are: "
+ + RuleScope.supportedValues());
+ }
+
+ private static RuleScope fromMetricScopeType(MetricScope.Type type) {
+ return switch (type) {
+ case TABLE -> TABLE;
+ case PARTITION -> TABLE;
+ case JOB -> JOB;
+ };
+ }
+
+ private static String supportedValues() {
+ return Arrays.stream(values())
+ .map(scope -> scope.configValue)
+ .collect(Collectors.joining("|"));
+ }
+ }
+
+ private enum AggregationOp {
+ MAX("max"),
+ MIN("min"),
+ AVG("avg"),
+ LATEST("latest");
+
+ private final String configValue;
+
+ AggregationOp(String configValue) {
+ this.configValue = configValue;
+ }
+
+ private static AggregationOp from(String value) {
+ String normalized = value == null ? "" :
value.trim().toLowerCase(Locale.ROOT);
+ for (AggregationOp op : values()) {
+ if (op.configValue.equals(normalized)) {
+ return op;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unsupported aggregation operation '"
+ + value
+ + "'. Supported operations are: "
+ + supportedValues());
+ }
+
+ private static String supportedValues() {
+ return Arrays.stream(values()).map(op ->
op.configValue).collect(Collectors.joining("|"));
+ }
+ }
+
+ /**
+ * Comparison operation used to verify aggregated {@code after} value
against aggregated {@code
+ * before} value.
+ */
+ private enum ComparisonOp {
+ LT("lt"),
+ LE("le"),
+ GT("gt"),
+ GE("ge"),
+ EQ("eq"),
+ NE("ne");
+
+ private static final double EPSILON = 1e-9;
+ private final String configValue;
+
+ ComparisonOp(String configValue) {
+ this.configValue = configValue;
+ }
+
+ /**
+ * Apply this comparison to evaluated numbers.
+ *
+ * @param left evaluated "after" value
+ * @param right evaluated "before" value
+ * @return true if the comparison passes
+ */
+ private boolean test(double left, double right) {
+ return switch (this) {
+ case LT -> left < right;
+ case LE -> left <= right;
+ case GT -> left > right;
+ case GE -> left >= right;
+ case EQ -> Math.abs(left - right) <= EPSILON;
+ case NE -> Math.abs(left - right) > EPSILON;
+ };
+ }
+
+ private static ComparisonOp from(String value) {
+ String normalized = value == null ? "" :
value.trim().toLowerCase(Locale.ROOT);
+ for (ComparisonOp op : values()) {
+ if (op.configValue.equals(normalized)) {
+ return op;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unsupported comparison operation '"
+ + value
+ + "'. Supported comparisons are: "
+ + supportedValues());
+ }
+
+ private static String supportedValues() {
+ return Arrays.stream(values()).map(op ->
op.configValue).collect(Collectors.joining("|"));
+ }
+ }
+
+ /** Parsed rule definition for one metric, including aggregation and
comparison semantics. */
+ private static class RuleConfig {
+ private final AggregationOp aggregation;
+ private final ComparisonOp comparison;
+
+ private RuleConfig(AggregationOp aggregation, ComparisonOp comparison) {
+ this.aggregation = aggregation;
+ this.comparison = comparison;
+ }
+
+ @Override
+ public String toString() {
+ return aggregation.name() + ":" + comparison.name();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
similarity index 60%
copy from
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
copy to
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
index 5ab2cebd61..38d9925961 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
@@ -17,18 +17,36 @@
* under the License.
*/
-package org.apache.gravitino.maintenance.optimizer.monitor.job;
+package org.apache.gravitino.maintenance.optimizer.monitor.job.dummy;
import java.util.List;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
-public class JobProviderForTest implements JobProvider {
+/**
+ * No-op {@link TableJobRelationProvider} implementation.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ * <li>Use this provider when you only want table/partition metric
evaluation.
+ * <li>No external configuration is required.
+ * <li>Always returns an empty job identifier list.
+ * </ul>
+ *
+ * <p>Configured by setting {@link
OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} to {@value
+ * #NAME}.
+ */
+public class DummyTableJobRelationProvider implements TableJobRelationProvider
{
+
+ public static final String NAME = "dummy-table-job-relation-provider";
- public static final String NAME = "job-provider-for-test";
- public static final NameIdentifier JOB1 =
NameIdentifier.parse("test.db.job1");
- public static final NameIdentifier JOB2 =
NameIdentifier.parse("test.db.job2");
+ @Override
+ public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+ return List.of();
+ }
@Override
public String name() {
@@ -38,11 +56,6 @@ public class JobProviderForTest implements JobProvider {
@Override
public void initialize(OptimizerEnv optimizerEnv) {}
- @Override
- public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
- return List.of(JOB1, JOB2);
- }
-
@Override
public void close() throws Exception {}
}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
new file mode 100644
index 0000000000..8571397a13
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job.local;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+
+/**
+ * A {@link TableJobRelationProvider} that reads table-to-jobs mappings from a
local JSON-lines
+ * file.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ * <li>Set {@link OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} to
{@value #NAME}.
+ * <li>Set {@link #JOB_FILE_PATH_CONFIG} ({@code
+ * gravitino.optimizer.monitor.localTableJobRelationProvider.filePath})
to a readable
+ * JSON-lines file path.
+ * <li>Optional: set {@link
OptimizerConfig#GRAVITINO_DEFAULT_CATALOG_CONFIG} to resolve two-level
+ * table identifiers ({@code schema.table}) into {@code
<defaultCatalog>.schema.table}.
+ * </ul>
+ *
+ * <p>Each line in the job file is a JSON object:
+ *
+ * <pre>{@code
+ * {"identifier":"catalog.schema.table","job-identifiers":["job1","job2"]}
+ * {"identifier":"schema.table","job-identifiers":["job3"]}
+ * }</pre>
+ *
+ * <p>Format rules:
+ *
+ * <ul>
+ * <li>{@code identifier}: table name, either {@code catalog.schema.table}
or {@code schema.table}
+ * (when default catalog is configured).
+ * <li>{@code job-identifiers}: array of job identifiers, each parseable by
{@link
+ * NameIdentifier#parse(String)}.
+ * <li>Malformed lines and invalid entries are skipped with warning logs.
+ * </ul>
+ */
+public class LocalTableJobRelationProvider implements TableJobRelationProvider
{
+
+ public static final String NAME = "local-table-job-relation-provider";
+ public static final String CONFIG_NAME = "localTableJobRelationProvider";
+
+ /** Config key for the local JSON-lines job mapping file path. */
+ public static final String JOB_FILE_PATH_CONFIG =
+ OptimizerConfig.MONITOR_PREFIX + CONFIG_NAME + ".filePath";
+
+ private LocalTableJobRelationReader jobReader;
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ String path = optimizerEnv.config().getRawString(JOB_FILE_PATH_CONFIG);
+ if (StringUtils.isBlank(path)) {
+ throw new IllegalArgumentException(
+ JOB_FILE_PATH_CONFIG + " must be provided for
LocalTableJobRelationProvider");
+ }
+ Path jobFilePath = Path.of(path).toAbsolutePath().normalize();
+ if (!Files.exists(jobFilePath)
+ || !Files.isRegularFile(jobFilePath)
+ || !Files.isReadable(jobFilePath)) {
+ throw new IllegalArgumentException(
+ "Configured job file path is not a readable file: " + jobFilePath);
+ }
+ String defaultCatalog =
+
optimizerEnv.config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG);
+ this.jobReader = new LocalTableJobRelationReader(jobFilePath,
defaultCatalog);
+ }
+
+ @Override
+ public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+ if (jobReader == null) {
+ throw new IllegalStateException("LocalTableJobRelationProvider is not
initialized");
+ }
+ return jobReader.readJobIdentifiers(tableIdentifier);
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
new file mode 100644
index 0000000000..0e01cb7ba3
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job.local;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Shared reader for file-based table-job relation providers. */
+class LocalTableJobRelationReader {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalTableJobRelationReader.class);
+
+ private final Path jobFilePath;
+ private final String defaultCatalogName;
+
+ LocalTableJobRelationReader(Path jobFilePath, String defaultCatalogName) {
+ Preconditions.checkArgument(jobFilePath != null, "jobFilePath cannot be
null");
+ this.jobFilePath = jobFilePath;
+ this.defaultCatalogName = defaultCatalogName;
+ }
+
+ List<NameIdentifier> readJobIdentifiers(NameIdentifier tableIdentifier) {
+ Set<NameIdentifier> jobs = new LinkedHashSet<>();
+
+ try (BufferedReader reader = Files.newBufferedReader(jobFilePath,
StandardCharsets.UTF_8)) {
+ String line;
+ int lineNumber = 0;
+ while ((line = reader.readLine()) != null) {
+ lineNumber++;
+ if (StringUtils.isBlank(line)) {
+ continue;
+ }
+
+ JobMappingLine mappingLine = parseJsonLine(line, lineNumber);
+ if (mappingLine == null) {
+ continue;
+ }
+
+ Optional<NameIdentifier> identifier =
parseIdentifier(mappingLine.identifier, lineNumber);
+ if (identifier.isEmpty() || !identifier.get().equals(tableIdentifier))
{
+ continue;
+ }
+
+ collectJobs(mappingLine.jobIdentifiers, jobs, lineNumber);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to read job mappings from file %s for table %s",
+ jobFilePath, tableIdentifier),
+ e);
+ }
+
+ return List.copyOf(jobs);
+ }
+
+ private JobMappingLine parseJsonLine(String line, int lineNumber) {
+ try {
+ return JsonUtils.anyFieldMapper().readValue(line, JobMappingLine.class);
+ } catch (IOException e) {
+ LOG.warn("Skip malformed job mapping at line {}: {}", lineNumber, line);
+ return null;
+ }
+ }
+
+ private Optional<NameIdentifier> parseIdentifier(String identifierText, int
lineNumber) {
+ if (StringUtils.isBlank(identifierText)) {
+ return Optional.empty();
+ }
+
+ Optional<NameIdentifier> parsed =
+ IdentifierUtils.parseTableIdentifier(identifierText,
defaultCatalogName);
+ if (parsed.isEmpty()) {
+ LOG.warn("Skip invalid table identifier at line {}: {}", lineNumber,
identifierText);
+ }
+ return parsed;
+ }
+
+ private void collectJobs(List<Object> jobIdentifiers, Set<NameIdentifier>
jobs, int lineNumber) {
+ if (jobIdentifiers == null || jobIdentifiers.isEmpty()) {
+ return;
+ }
+ for (Object jobIdentifier : jobIdentifiers) {
+ if (!(jobIdentifier instanceof String)) {
+ continue;
+ }
+ String jobText = (String) jobIdentifier;
+ if (StringUtils.isBlank(jobText)) {
+ continue;
+ }
+ try {
+ jobs.add(NameIdentifier.parse(jobText));
+ } catch (Exception e) {
+ LOG.warn("Skip invalid job identifier at line {}: {}", lineNumber,
jobText);
+ }
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ private static class JobMappingLine {
+ @JsonProperty("identifier")
+ private String identifier;
+
+ @JsonProperty("job-identifiers")
+ private List<Object> jobIdentifiers;
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
new file mode 100644
index 0000000000..a7a1b5c56c
--- /dev/null
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import
org.apache.gravitino.maintenance.optimizer.common.PartitionMetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricRecord;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+
+/**
+ * {@link MetricsProvider} implementation backed by Gravitino metric storage.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ * <li>Set {@link
org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig
+ * #METRICS_PROVIDER_CONFIG} to {@value #NAME}.
+ * <li>This provider initializes an internal {@link
GenericJdbcMetricsRepository} and reads
+ * metrics through {@link MetricsRepository} APIs.
+ * </ul>
+ *
+ * <p>Behavior:
+ *
+ * <ul>
+ * <li>{@link #jobMetrics(NameIdentifier, long, long)} returns job metrics
in the requested time
+ * range.
+ * <li>{@link #tableMetrics(NameIdentifier, long, long)} returns table
metrics in the requested
+ * time range.
+ * <li>{@link #partitionMetrics(NameIdentifier, PartitionPath, long, long)}
returns partition
+ * metrics using encoded partition path.
+ * <li>Storage records are converted to monitor-domain {@link MetricSample}
values.
+ * </ul>
+ */
+public class GravitinoMetricsProvider implements MetricsProvider {
+
+ public static final String NAME = "gravitino-metrics-provider";
+ private MetricsRepository metricsRepository;
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public void initialize(OptimizerEnv optimizerEnv) {
+ MetricsRepository repository = new GenericJdbcMetricsRepository();
+ repository.initialize(optimizerEnv.config().getAllConfig());
+ this.metricsRepository = repository;
+ }
+
+ @Override
+ public Map<String, List<MetricSample>> jobMetrics(
+ NameIdentifier jobIdentifier, long startTime, long endTime) {
+ ensureInitialized();
+ Map<String, List<MetricRecord>> metrics =
+ metricsRepository.getJobMetrics(jobIdentifier, startTime, endTime);
+
+ return toSingleMetrics(metrics, Optional.empty());
+ }
+
+ @Override
+ public Map<String, List<MetricSample>> tableMetrics(
+ NameIdentifier tableIdentifier, long startTime, long endTime) {
+ ensureInitialized();
+ Map<String, List<MetricRecord>> metrics =
+ metricsRepository.getTableMetrics(tableIdentifier, startTime, endTime);
+
+ return toSingleMetrics(metrics, Optional.empty());
+ }
+
+ @Override
+ public Map<String, List<MetricSample>> partitionMetrics(
+ NameIdentifier tableIdentifier, PartitionPath partitionPath, long
startTime, long endTime) {
+ ensureInitialized();
+ Map<String, List<MetricRecord>> metrics =
+ metricsRepository.getPartitionMetrics(
+ tableIdentifier,
PartitionUtils.encodePartitionPath(partitionPath), startTime, endTime);
+
+ return toSingleMetrics(metrics, Optional.of(partitionPath));
+ }
+
+ private Map<String, List<MetricSample>> toSingleMetrics(
+ Map<String, List<MetricRecord>> metrics, Optional<PartitionPath>
partitionPath) {
+ return metrics.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ entry -> entry.getKey(),
+ entry ->
+ entry.getValue().stream()
+ .map(
+ storageMetric ->
+ toSingleMetric(storageMetric, partitionPath,
entry.getKey()))
+ .collect(Collectors.toList())));
+ }
+
+ private MetricSample toSingleMetric(
+ MetricRecord metric, Optional<PartitionPath> partitionPath, String
metricName) {
+ StatisticEntry<?> statistic =
+ new StatisticEntryImpl<>(metricName,
StatisticValueUtils.fromString(metric.getValue()));
+ return partitionPath
+ .<MetricSample>map(
+ partition -> new PartitionMetricSampleImpl(metric.getTimestamp(),
statistic, partition))
+ .orElseGet(() -> new MetricSampleImpl(metric.getTimestamp(),
statistic));
+ }
+
+ private void ensureInitialized() {
+ Preconditions.checkState(
+ metricsRepository != null, "GravitinoMetricsProvider is not
initialized");
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (metricsRepository != null) {
+ metricsRepository.close();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
index 0bb5a693b0..4b87faa118 100644
---
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
+++
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
@@ -44,6 +44,7 @@ import
org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
import
org.apache.gravitino.maintenance.optimizer.api.common.TableAndPartitionStatistics;
import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
import org.apache.gravitino.stats.StatisticValue;
import org.apache.gravitino.stats.StatisticValues;
import org.slf4j.Logger;
@@ -142,16 +143,16 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
new StatisticsRecordVisitor() {
@Override
public void onTable(StatisticsRecord record) {
- NameIdentifier identifier =
parseTableIdentifier(record.identifier());
- if (identifier == null) {
+ Optional<NameIdentifier> identifier =
parseTableIdentifier(record.identifier());
+ if (identifier.isEmpty()) {
return;
}
- if (targetIdentifier != null &&
!targetIdentifier.equals(identifier)) {
+ if (targetIdentifier != null &&
!targetIdentifier.equals(identifier.get())) {
return;
}
Map<String, StatisticValue<?>> statisticsByName =
- aggregated.computeIfAbsent(identifier, k -> new
LinkedHashMap<>());
+ aggregated.computeIfAbsent(identifier.get(), k -> new
LinkedHashMap<>());
populateStatistics(record, statisticsByName);
}
});
@@ -171,16 +172,16 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
new StatisticsRecordVisitor() {
@Override
public void onTable(StatisticsRecord record) {
- NameIdentifier identifier =
parseTableIdentifier(record.identifier());
- if (targetIdentifier.equals(identifier)) {
+ Optional<NameIdentifier> identifier =
parseTableIdentifier(record.identifier());
+ if (identifier.isPresent() &&
targetIdentifier.equals(identifier.get())) {
populateStatistics(record, tableStatistics);
}
}
@Override
public void onPartition(StatisticsRecord record) {
- NameIdentifier identifier =
parseTableIdentifier(record.identifier());
- if (!targetIdentifier.equals(identifier)) {
+ Optional<NameIdentifier> identifier =
parseTableIdentifier(record.identifier());
+ if (identifier.isEmpty() ||
!targetIdentifier.equals(identifier.get())) {
return;
}
@@ -210,19 +211,20 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
new StatisticsRecordVisitor() {
@Override
public void onTable(StatisticsRecord record) {
- NameIdentifier identifier =
parseTableIdentifier(record.identifier());
- if (identifier == null) {
+ Optional<NameIdentifier> identifier =
parseTableIdentifier(record.identifier());
+ if (identifier.isEmpty()) {
return;
}
Map<String, StatisticValue<?>> tableStats =
- tableStatisticsByIdentifier.computeIfAbsent(identifier, k ->
new LinkedHashMap<>());
+ tableStatisticsByIdentifier.computeIfAbsent(
+ identifier.get(), k -> new LinkedHashMap<>());
populateStatistics(record, tableStats);
}
@Override
public void onPartition(StatisticsRecord record) {
- NameIdentifier identifier =
parseTableIdentifier(record.identifier());
- if (identifier == null) {
+ Optional<NameIdentifier> identifier =
parseTableIdentifier(record.identifier());
+ if (identifier.isEmpty()) {
return;
}
@@ -233,7 +235,7 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
Map<PartitionPath, Map<String, StatisticValue<?>>>
partitionStatsByPath =
partitionStatisticsByIdentifier.computeIfAbsent(
- identifier, k -> new LinkedHashMap<>());
+ identifier.get(), k -> new LinkedHashMap<>());
Map<String, StatisticValue<?>> partitionStatsByName =
partitionStatsByPath.computeIfAbsent(
partitionPathOpt.get(), k -> new LinkedHashMap<>());
@@ -265,16 +267,16 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
new StatisticsRecordVisitor() {
@Override
public void onJob(StatisticsRecord record) {
- NameIdentifier identifier =
parseJobIdentifier(record.identifier());
- if (identifier == null) {
+ Optional<NameIdentifier> identifier =
parseJobIdentifier(record.identifier());
+ if (identifier.isEmpty()) {
return;
}
- if (targetIdentifier != null &&
!targetIdentifier.equals(identifier)) {
+ if (targetIdentifier != null &&
!targetIdentifier.equals(identifier.get())) {
return;
}
Map<String, StatisticValue<?>> statisticsByName =
- aggregated.computeIfAbsent(identifier, k -> new
LinkedHashMap<>());
+ aggregated.computeIfAbsent(identifier.get(), k -> new
LinkedHashMap<>());
populateStatistics(record, statisticsByName);
}
});
@@ -406,42 +408,24 @@ abstract class AbstractStatisticsImporter implements
StatisticsImporter {
}
}
- private NameIdentifier parseTableIdentifier(String identifierText) {
+ private Optional<NameIdentifier> parseTableIdentifier(String identifierText)
{
if (StringUtils.isBlank(identifierText)) {
- return null;
+ return Optional.empty();
}
- try {
- NameIdentifier parsed = NameIdentifier.parse(identifierText);
- int levels = parsed.namespace().levels().length;
- if (levels == 0) {
- return parsed;
- } else if (levels == 1) {
- if (StringUtils.isNotBlank(defaultCatalogName)) {
- return NameIdentifier.of(
- defaultCatalogName, parsed.namespace().levels()[0],
parsed.name());
- }
- return parsed;
- } else if (levels == 2) {
- return parsed;
- } else {
- return null;
- }
- } catch (Exception e) {
+ Optional<NameIdentifier> parsed =
+ IdentifierUtils.parseTableIdentifier(identifierText,
defaultCatalogName);
+ if (parsed.isEmpty()) {
LOG.warn("Skip line with invalid identifier: {}", identifierText);
- return null;
}
+ return parsed;
}
- private NameIdentifier parseJobIdentifier(String identifierText) {
- if (StringUtils.isBlank(identifierText)) {
- return null;
- }
- try {
- return NameIdentifier.parse(identifierText);
- } catch (Exception e) {
+ private Optional<NameIdentifier> parseJobIdentifier(String identifierText) {
+ Optional<NameIdentifier> parsed =
IdentifierUtils.parseJobIdentifier(identifierText);
+ if (parsed.isEmpty() && StringUtils.isNotBlank(identifierText)) {
LOG.warn("Skip line with invalid identifier: {}", identifierText);
- return null;
}
+ return parsed;
}
private static final class StatisticsRecord {
diff --git
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 2458ac7926..c059ab3483 100644
---
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -23,3 +23,7 @@
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater
+org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider
+org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider
+org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider
+org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
similarity index 67%
copy from
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
copy to
maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
index 4cc9447c73..d151270911 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
@@ -17,8 +17,4 @@
# under the License.
#
-org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
-org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
-org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest
+org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
index 4b4958ff53..dd50fc4962 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
@@ -103,4 +103,67 @@ class TestIdentifierUtils {
IllegalArgumentException.class,
() -> IdentifierUtils.requireTableIdentifierNormalized(identifier));
}
+
+ @Test
+ void testParseTableIdentifierWithDefaultCatalog() {
+ NameIdentifier result =
+ IdentifierUtils.parseTableIdentifier("db.table",
"catalog").orElseThrow();
+
+ Assertions.assertEquals(NameIdentifier.of("catalog", "db", "table"),
result);
+ }
+
+ @Test
+ void testParseTableIdentifierWithoutDefaultCatalogReturnsEmpty() {
+ Assertions.assertTrue(IdentifierUtils.parseTableIdentifier("db.table",
null).isEmpty());
+ }
+
+ @Test
+ void testParseTableIdentifierWithThreeLevelIdentifier() {
+ NameIdentifier result =
+ IdentifierUtils.parseTableIdentifier("catalog.db.table",
null).orElseThrow();
+
+ Assertions.assertEquals(NameIdentifier.of("catalog", "db", "table"),
result);
+ }
+
+ @Test
+ void testParseTableIdentifierWithSingleLevelReturnsEmpty() {
+ Assertions.assertTrue(IdentifierUtils.parseTableIdentifier("table",
"catalog").isEmpty());
+ }
+
+ @Test
+ void testParseTableIdentifierWithInvalidIdentifierReturnsEmpty() {
+ Assertions.assertTrue(
+ IdentifierUtils.parseTableIdentifier("catalog.db.schema.extra.table",
"c").isEmpty());
+ }
+
+ @Test
+ void testParseJobIdentifierWithValidIdentifier() {
+ NameIdentifier result =
+
IdentifierUtils.parseJobIdentifier("org.team.pipeline.job1").orElseThrow();
+ Assertions.assertEquals("org.team.pipeline.job1", result.toString());
+ }
+
+ @Test
+ void testParseJobIdentifierWithSingleLevelName() {
+ NameIdentifier result =
IdentifierUtils.parseJobIdentifier("job").orElseThrow();
+ Assertions.assertEquals("job", result.toString());
+ }
+
+ @Test
+ void testParseJobIdentifierWithTwoLevelName() {
+ NameIdentifier result =
IdentifierUtils.parseJobIdentifier("team.job").orElseThrow();
+ Assertions.assertEquals("team.job", result.toString());
+ }
+
+ @Test
+ void testParseJobIdentifierWithBlankOrInvalidIdentifierReturnsEmpty() {
+ Assertions.assertTrue(IdentifierUtils.parseJobIdentifier(" ").isEmpty());
+
Assertions.assertTrue(IdentifierUtils.parseJobIdentifier("invalid..job").isEmpty());
+ }
+
+ @Test
+ void testParseJobIdentifierWithTooManyLevels() {
+ NameIdentifier result =
IdentifierUtils.parseJobIdentifier("a.b.c.d.e").orElseThrow();
+ Assertions.assertEquals("a.b.c.d.e", result.toString());
+ }
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index 9801b9f727..3ac0e1a84d 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -19,12 +19,15 @@
package org.apache.gravitino.maintenance.optimizer.common.util;
+import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
import
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import
org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback;
import
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
-import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider;
import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
import
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter;
import
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
@@ -78,8 +81,20 @@ public class TestProviderUtils {
ProviderUtils.createMetricsProviderInstance(MetricsProviderForTest.NAME)
instanceof MetricsProviderForTest);
Assertions.assertTrue(
- ProviderUtils.createJobProviderInstance(JobProviderForTest.NAME)
- instanceof JobProviderForTest);
+
ProviderUtils.createTableJobRelationProviderInstance(TableJobRelationProviderForTest.NAME)
+ instanceof TableJobRelationProviderForTest);
+ DummyTableJobRelationProvider dummyTableJobRelationProvider =
+ (DummyTableJobRelationProvider)
+ ProviderUtils.createTableJobRelationProviderInstance(
+ DummyTableJobRelationProvider.NAME);
+ Assertions.assertNotNull(dummyTableJobRelationProvider);
+ Assertions.assertTrue(
+ dummyTableJobRelationProvider
+ .jobIdentifiers(NameIdentifier.parse("catalog.db.table"))
+ .isEmpty());
+ Assertions.assertTrue(
+
ProviderUtils.createMonitorCallbackInstance(ConsoleMonitorCallback.NAME)
+ instanceof ConsoleMonitorCallback);
Assertions.assertTrue(
ProviderUtils.createMonitorCallbackInstance(MonitorCallbackForTest.NAME)
instanceof MonitorCallbackForTest);
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
index 5185edf7ce..9a3d934e1c 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
@@ -30,8 +30,9 @@ import
org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
import
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator;
import
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
-import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
import
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -44,7 +45,9 @@ public class TestMonitor {
new OptimizerConfig(
ImmutableMap.<String, String>builder()
.put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
- .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+ TableJobRelationProviderForTest.NAME)
.put(
OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
.put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
@@ -92,7 +95,7 @@ public class TestMonitor {
.longValue());
Assertions.assertEquals(MetricScope.Type.JOB, jobResult1.scope().type());
- Assertions.assertEquals(JobProviderForTest.JOB1,
jobResult1.scope().identifier());
+ Assertions.assertEquals(TableJobRelationProviderForTest.JOB1,
jobResult1.scope().identifier());
Assertions.assertTrue(jobResult1.evaluation());
Assertions.assertEquals(99L,
jobResult1.beforeMetrics().get("duration").get(0).timestamp());
Assertions.assertEquals(102L,
jobResult1.afterMetrics().get("duration").get(0).timestamp());
@@ -106,7 +109,7 @@ public class TestMonitor {
.longValue());
Assertions.assertEquals(MetricScope.Type.JOB, jobResult2.scope().type());
- Assertions.assertEquals(JobProviderForTest.JOB2,
jobResult2.scope().identifier());
+ Assertions.assertEquals(TableJobRelationProviderForTest.JOB2,
jobResult2.scope().identifier());
Assertions.assertFalse(jobResult2.evaluation());
Assertions.assertEquals(98L,
jobResult2.beforeMetrics().get("duration").get(0).timestamp());
Assertions.assertEquals(104L,
jobResult2.afterMetrics().get("duration").get(0).timestamp());
@@ -126,7 +129,9 @@ public class TestMonitor {
new OptimizerConfig(
ImmutableMap.<String, String>builder()
.put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
- .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+ TableJobRelationProviderForTest.NAME)
.put(
OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
.put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
@@ -172,7 +177,9 @@ public class TestMonitor {
new OptimizerConfig(
ImmutableMap.<String, String>builder()
.put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
- .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(),
JobProviderForTest.NAME)
+ .put(
+
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+ TableJobRelationProviderForTest.NAME)
.put(
OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
MetricsEvaluatorForTest.NAME)
.put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(),
MonitorCallbackForTest.NAME)
@@ -189,4 +196,35 @@ public class TestMonitor {
Assertions.assertTrue(exception.getMessage().contains("time range
overflow"));
}
}
+
+ @Test
+ public void testEvaluateMetricsWithScopedGravitinoEvaluatorRules() throws
Exception {
+ OptimizerConfig config =
+ new OptimizerConfig(
+ ImmutableMap.<String, String>builder()
+ .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(),
MetricsProviderForTest.NAME)
+ .put(
+
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+ TableJobRelationProviderForTest.NAME)
+ .put(
+ OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
+ GravitinoMetricsEvaluator.NAME)
+ .put(
+ GravitinoMetricsEvaluator.EVALUATION_RULES_CONFIG,
+ "table:row_count:avg:le,job:duration:latest:le")
+ .build());
+
+ OptimizerEnv env = new OptimizerEnv(config);
+ NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+
+ List<EvaluationResult> results;
+ try (Monitor monitor = new Monitor(env)) {
+ results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L,
Optional.empty());
+ }
+
+ Assertions.assertEquals(3, results.size(), "Expected one table result and
two job results");
+ Assertions.assertFalse(results.get(0).evaluation(), "Table rule should
fail for test metrics");
+ Assertions.assertFalse(results.get(1).evaluation(), "Job1 latest duration
rule should fail");
+ Assertions.assertFalse(results.get(2).evaluation(), "Job2 latest duration
rule should fail");
+ }
}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
index 9d9a994d04..fab16fcf1f 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
-import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
public class MetricsEvaluatorForTest implements MetricsEvaluator {
@@ -46,7 +46,7 @@ public class MetricsEvaluatorForTest implements
MetricsEvaluator {
INVOCATIONS.incrementAndGet();
if (FAIL_JOB2
&& scope.type() == MetricScope.Type.JOB
- && JobProviderForTest.JOB2.equals(scope.identifier())) {
+ && TableJobRelationProviderForTest.JOB2.equals(scope.identifier())) {
return false;
}
return true;
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
new file mode 100644
index 0000000000..5120d42e24
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoMetricsEvaluator {
+
+ @Test
+ public void testEvaluateMetricsWithMaxOperation() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:max:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(
+ metric(100L, "row_count", 10L),
+ metric(101L, "row_count", 50L),
+ metric(102L, "row_count", 20L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 30L))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithMinOperation() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:min:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 30L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 12L))));
+
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithAvgOperation() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:avg:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 20L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithLatestOperation() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:latest:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 100L), metric(105L,
"row_count", 10L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 20L))));
+
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsForPartitionUsesTableRules() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:latest:le");
+ PartitionPath partitionPath =
+ PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-14")));
+ MetricScope scope =
+ MetricScope.forPartition(NameIdentifier.parse("catalog.db.table"),
partitionPath);
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 100L), metric(105L,
"row_count", 10L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 20L))));
+
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsDistinguishTableAndJobRules() {
+ GravitinoMetricsEvaluator evaluator =
+ createEvaluator("table:row_count:avg:le,job:duration:latest:le");
+
+ boolean tableResult =
+ evaluator.evaluateMetrics(
+ MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 20L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+ Assertions.assertTrue(tableResult);
+
+ boolean jobResult =
+ evaluator.evaluateMetrics(
+ MetricScope.forJob(NameIdentifier.parse("job1")),
+ Map.of("duration", List.of(metric(100L, "duration", 3L),
metric(105L, "duration", 5L))),
+ Map.of("duration", List.of(metric(110L, "duration", 10L))));
+ Assertions.assertFalse(jobResult);
+ }
+
+ @Test
+ public void testEvaluateMetricsMissingSelectedMetricSkipsRule() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:avg:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of("other_metric", List.of(metric(100L, "other_metric", 10L))),
+ Map.of("other_metric", List.of(metric(110L, "other_metric", 5L))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsNonNumericValueSkipsRule() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:avg:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of("row_count", List.of(metricString(100L, "row_count",
"invalid"))),
+ Map.of("row_count", List.of(metricString(110L, "row_count",
"invalid2"))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithoutRulesForScopeReturnsTrue() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:avg:le");
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ MetricScope.forJob(NameIdentifier.parse("job1")),
+ Map.of("duration", List.of(metric(100L, "duration", 10L))),
+ Map.of("duration", List.of(metric(110L, "duration", 20L))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithMixedCaseRuleMetricName() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:Row_Count:avg:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 20L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testEvaluateMetricsWithEmptyBeforeAndAfterMetricsReturnsTrue() {
+ GravitinoMetricsEvaluator evaluator =
createEvaluator("table:row_count:avg:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result = evaluator.evaluateMetrics(scope, Map.of(), Map.of());
+ Assertions.assertTrue(result);
+
+ boolean emptyListResult =
+ evaluator.evaluateMetrics(
+ scope, Map.of("row_count", List.of()), Map.of("row_count",
List.of()));
+ Assertions.assertTrue(emptyListResult);
+ }
+
+ @Test
+ public void testEvaluateMetricsReturnsFalseWhenAnyRuleFails() {
+ GravitinoMetricsEvaluator evaluator =
+ createEvaluator("table:row_count:avg:le,table:file_count:max:le");
+ MetricScope scope =
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ scope,
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 20L)),
+ "file_count",
+ List.of(metric(100L, "file_count", 5L), metric(101L,
"file_count", 10L))),
+ Map.of(
+ "row_count",
+ List.of(metric(110L, "row_count", 15L)),
+ "file_count",
+ List.of(metric(110L, "file_count", 11L))));
+
+ Assertions.assertFalse(result);
+ }
+
+ @Test
+ public void testInitializeAcceptsWhitespaceAndCaseInsensitiveRuleTokens() {
+ GravitinoMetricsEvaluator evaluator =
+ createEvaluator(" TABLE : row_count : AVG : LE , JOB : duration :
latest : le , ");
+
+ boolean tableResult =
+ evaluator.evaluateMetrics(
+ MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+ Map.of(
+ "row_count",
+ List.of(metric(100L, "row_count", 10L), metric(101L,
"row_count", 20L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+ Assertions.assertTrue(tableResult);
+
+ boolean jobResult =
+ evaluator.evaluateMetrics(
+ MetricScope.forJob(NameIdentifier.parse("job1")),
+ Map.of("duration", List.of(metric(100L, "duration", 3L),
metric(105L, "duration", 5L))),
+ Map.of("duration", List.of(metric(110L, "duration", 10L))));
+ Assertions.assertFalse(jobResult);
+ }
+
+ @Test
+ public void testParseEvaluationRulesReturnsEmptyForBlankInput() {
+ Assertions.assertTrue(GravitinoMetricsEvaluator.parseEvaluationRules("
").isEmpty());
+ }
+
+ @Test
+ public void testParseEvaluationRulesParsesExpectedScopesAndRules() {
+ Map<?, ?> parsed =
+ GravitinoMetricsEvaluator.parseEvaluationRules(
+ "table:row_count:avg:le,job:duration:max:gt");
+
+ Assertions.assertEquals(2, parsed.size());
+ Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+ Map<?, ?> jobRules = findScopeRules(parsed, "JOB");
+ Assertions.assertEquals(1, tableRules.size());
+ Assertions.assertEquals(1, jobRules.size());
+ Assertions.assertEquals("AVG:LE", tableRules.get("row_count").toString());
+ Assertions.assertEquals("MAX:GT", jobRules.get("duration").toString());
+ }
+
+ @Test
+ public void testParseEvaluationRulesCreatesImmutableMaps() {
+ Map<?, ?> parsed =
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:le");
+
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> parsedMutableView = (Map<Object, Object>) parsed;
+ Assertions.assertThrows(
+ UnsupportedOperationException.class, () ->
parsedMutableView.put("TABLE", Map.of()));
+ Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+ @SuppressWarnings("unchecked")
+ Map<Object, Object> tableRulesMutableView = (Map<Object, Object>)
tableRules;
+ Assertions.assertThrows(
+ UnsupportedOperationException.class, () ->
tableRulesMutableView.put("x", "y"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsInvalidAggregation() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:sum:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsPartitionScopeInRule() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("partition:row_count:avg:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsRuleWithoutScope() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("row_count:avg:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsDuplicateRuleInSameScope() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ GravitinoMetricsEvaluator.parseEvaluationRules(
+ "table:row_count:avg:le,table:row_count:max:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsMalformedRuleToken() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:le:extra"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsLegacyScopeMetricRuleFormat() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("table.row_count:avg:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsRuleWithBlankMetricName() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> GravitinoMetricsEvaluator.parseEvaluationRules("table::avg:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsRuleWithBlankScope() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules(":row_count:avg:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsRuleWithBlankAggregation() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count::le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesRejectsRuleWithBlankComparison() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:"));
+ }
+
+ @Test
+ public void
testParseEvaluationRulesRejectsDuplicateMetricRuleAfterNormalization() {
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ GravitinoMetricsEvaluator.parseEvaluationRules(
+ "table:Row_Count:avg:le,table:row_count:max:le"));
+ }
+
+ @Test
+ public void testParseEvaluationRulesAllowsBlankRuleEntriesBetweenCommas() {
+ Map<?, ?> parsed =
+ GravitinoMetricsEvaluator.parseEvaluationRules(
+ "table:row_count:avg:le,,job:duration:latest:le,");
+
+ Assertions.assertEquals(2, parsed.size());
+ Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+ Map<?, ?> jobRules = findScopeRules(parsed, "JOB");
+ Assertions.assertEquals("AVG:LE", tableRules.get("row_count").toString());
+ Assertions.assertEquals("LATEST:LE", jobRules.get("duration").toString());
+ }
+
+ @Test
+ public void testInitializeAllowsBlankRules() {
+ GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+ evaluator.initialize(optimizerEnv(" "));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+ Map.of("row_count", List.of(metric(100L, "row_count", 10L))),
+ Map.of("row_count", List.of(metric(110L, "row_count", 999L))));
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testInitializeAllowsMissingRules() {
+ GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+ evaluator.initialize(new OptimizerEnv(new OptimizerConfig(Map.of())));
+
+ boolean result =
+ evaluator.evaluateMetrics(
+ MetricScope.forJob(NameIdentifier.parse("job1")),
+ Map.of("duration", List.of(metric(100L, "duration", 1L))),
+ Map.of("duration", List.of(metric(110L, "duration", 999L))));
+ Assertions.assertTrue(result);
+ }
+
+ @Test
+ public void testInitializeRejectsInvalidComparison() {
+ GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () ->
evaluator.initialize(optimizerEnv("table:row_count:avg:decrease")));
+ }
+
+ private static GravitinoMetricsEvaluator createEvaluator(String rules) {
+ GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+ evaluator.initialize(optimizerEnv(rules));
+ return evaluator;
+ }
+
+ private static OptimizerEnv optimizerEnv(String rules) {
+ return new OptimizerEnv(
+ new
OptimizerConfig(Map.of(GravitinoMetricsEvaluator.EVALUATION_RULES_CONFIG,
rules)));
+ }
+
+ private static MetricSample metric(long timestamp, String name, long value) {
+ return new MetricSampleImpl(timestamp, entry(name,
StatisticValues.longValue(value)));
+ }
+
+ private static MetricSample metricString(long timestamp, String name, String
value) {
+ return new MetricSampleImpl(timestamp, entry(name,
StatisticValues.stringValue(value)));
+ }
+
+ private static StatisticEntry<?> entry(
+ String name, org.apache.gravitino.stats.StatisticValue<?> value) {
+ return new StatisticEntryImpl(name, value);
+ }
+
+ private static Map<?, ?> findScopeRules(Map<?, ?> parsedRules, String
scopeName) {
+ return parsedRules.entrySet().stream()
+ .filter(entry -> scopeName.equals(entry.getKey().toString()))
+ .map(Map.Entry::getValue)
+ .map(Map.class::cast)
+ .findFirst()
+ .orElseThrow(() -> new AssertionError("Missing expected scope " +
scopeName));
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
similarity index 85%
rename from
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
rename to
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
index 5ab2cebd61..888d06efa3 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
@@ -21,12 +21,12 @@ package
org.apache.gravitino.maintenance.optimizer.monitor.job;
import java.util.List;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
-public class JobProviderForTest implements JobProvider {
+public class TableJobRelationProviderForTest implements
TableJobRelationProvider {
- public static final String NAME = "job-provider-for-test";
+ public static final String NAME = "table-job-relation-provider-for-test";
public static final NameIdentifier JOB1 =
NameIdentifier.parse("test.db.job1");
public static final NameIdentifier JOB2 =
NameIdentifier.parse("test.db.job2");
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
new file mode 100644
index 0000000000..2d0b96261f
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestLocalTableJobRelationProvider {
+
+ @TempDir Path tempDir;
+
+ @Test
+ void testGetJobNamesMergesAndNormalizesIdentifiers() throws IOException {
+ Path jobFile = tempDir.resolve("jobs.jsonl");
+ Files.write(
+ jobFile,
+ List.of(
+
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"job1\",\"job2\"]}",
+
"{\"identifier\":\"schema.table\",\"job-identifiers\":[\"job2\",\"job3\"]}",
+
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"job4\",\"invalid..job\"]}",
+
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"\",12]}",
+ "{\"identifier\":\"other.table\",\"job-identifiers\":[\"jobX\"]}",
+ "malformed json"));
+
+ LocalTableJobRelationProvider provider = new
LocalTableJobRelationProvider();
+ OptimizerEnv optimizerEnv = new OptimizerEnv(createConfig(jobFile));
+ provider.initialize(optimizerEnv);
+
+ List<NameIdentifier> jobs =
+ provider.jobIdentifiers(NameIdentifier.parse("catalog.schema.table"));
+
+ Assertions.assertEquals(
+ List.of(
+ NameIdentifier.parse("job1"),
+ NameIdentifier.parse("job2"),
+ NameIdentifier.parse("job3"),
+ NameIdentifier.parse("job4")),
+ jobs);
+ }
+
+ @Test
+ void testInitializeRequiresFilePath() {
+ LocalTableJobRelationProvider provider = new
LocalTableJobRelationProvider();
+ OptimizerEnv optimizerEnv = new OptimizerEnv(new OptimizerConfig());
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
provider.initialize(optimizerEnv));
+ }
+
+ @Test
+ void testInitializeRejectsMissingFilePath() {
+ LocalTableJobRelationProvider provider = new
LocalTableJobRelationProvider();
+ Path missingFile = tempDir.resolve("missing-jobs.jsonl");
+ OptimizerEnv optimizerEnv = new OptimizerEnv(createConfig(missingFile));
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () ->
provider.initialize(optimizerEnv));
+ }
+
+ @Test
+ void testJobIdentifiersRequiresInitialize() {
+ LocalTableJobRelationProvider provider = new
LocalTableJobRelationProvider();
+ Assertions.assertThrows(
+ IllegalStateException.class,
+ () ->
provider.jobIdentifiers(NameIdentifier.parse("catalog.schema.table")));
+ }
+
+ private OptimizerConfig createConfig(Path jobFile) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(LocalTableJobRelationProvider.JOB_FILE_PATH_CONFIG,
jobFile.toString());
+ configs.put(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG.getKey(),
"catalog");
+ return new OptimizerConfig(configs);
+ }
+}
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
index 3f371c24c1..2f862dcd50 100644
---
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
@@ -28,7 +28,7 @@ import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
-import
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
import org.apache.gravitino.stats.StatisticValue;
import org.apache.gravitino.stats.StatisticValues;
@@ -49,14 +49,14 @@ public class MetricsProviderForTest implements
MetricsProvider {
@Override
public Map<String, List<MetricSample>> jobMetrics(
NameIdentifier jobIdentifier, long startTime, long endTime) {
- if (JobProviderForTest.JOB1.equals(jobIdentifier)) {
+ if (TableJobRelationProviderForTest.JOB1.equals(jobIdentifier)) {
return Map.of(
"duration",
List.of(
metric(99, "duration", StatisticValues.longValue(10L)),
metric(102, "duration", StatisticValues.longValue(20L))));
}
- if (JobProviderForTest.JOB2.equals(jobIdentifier)) {
+ if (TableJobRelationProviderForTest.JOB2.equals(jobIdentifier)) {
return Map.of(
"duration",
List.of(
diff --git
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
new file mode 100644
index 0000000000..e19091e8ce
--- /dev/null
+++
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import
org.apache.gravitino.maintenance.optimizer.api.common.PartitionMetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils;
+import
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.JobMetricWriteRequest;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricRecordImpl;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.TableMetricWriteRequest;
+import
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestGravitinoMetricsProvider {
+
+ @TempDir Path tempDir;
+
+ @Test
+ public void testReadTableJobAndPartitionMetrics() throws Exception {
+ Path metricsPath = tempDir.resolve("metrics-test");
+ String jdbcUrl = "jdbc:h2:file:" + metricsPath +
";DB_CLOSE_DELAY=-1;MODE=MYSQL";
+ Map<String, String> configs =
+ Map.of(
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_URL,
+ jdbcUrl,
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_USER,
+ "sa",
+ OptimizerConfig.OPTIMIZER_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+ + GenericJdbcMetricsRepository.JDBC_PASSWORD,
+ "");
+ OptimizerEnv optimizerEnv = new OptimizerEnv(new OptimizerConfig(configs));
+
+ NameIdentifier table = NameIdentifier.parse("catalog.db.table1");
+ NameIdentifier job = NameIdentifier.parse("job1");
+ PartitionPath partitionPath =
+ PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-14")));
+
+ try (MetricsRepository metricsRepository = new
GenericJdbcMetricsRepository()) {
+ metricsRepository.initialize(configs);
+ metricsRepository.storeTableMetrics(
+ List.of(
+ new TableMetricWriteRequest(
+ table,
+ "row_count",
+ Optional.empty(),
+ new MetricRecordImpl(
+ 100L,
StatisticValueUtils.toString(StatisticValues.longValue(10L))))));
+ metricsRepository.storeTableMetrics(
+ List.of(
+ new TableMetricWriteRequest(
+ table,
+ "row_count",
+
Optional.of(PartitionUtils.encodePartitionPath(partitionPath)),
+ new MetricRecordImpl(
+ 101L,
StatisticValueUtils.toString(StatisticValues.longValue(11L))))));
+ metricsRepository.storeJobMetrics(
+ List.of(
+ new JobMetricWriteRequest(
+ job,
+ "duration",
+ new MetricRecordImpl(
+ 102L,
StatisticValueUtils.toString(StatisticValues.longValue(99L))))));
+ }
+
+ GravitinoMetricsProvider provider = new GravitinoMetricsProvider();
+ provider.initialize(optimizerEnv);
+ try {
+ Map<String, List<MetricSample>> tableMetrics =
provider.tableMetrics(table, 0L, 200L);
+ Assertions.assertEquals(1, tableMetrics.get("row_count").size());
+ Assertions.assertEquals(
+ 10L,
tableMetrics.get("row_count").get(0).statistic().value().value());
+
+ Map<String, List<MetricSample>> partitionMetrics =
+ provider.partitionMetrics(table, partitionPath, 0L, 200L);
+ Assertions.assertEquals(1, partitionMetrics.get("row_count").size());
+ Assertions.assertTrue(
+ partitionMetrics.get("row_count").get(0) instanceof
PartitionMetricSample);
+
+ Map<String, List<MetricSample>> jobMetrics = provider.jobMetrics(job,
0L, 200L);
+ Assertions.assertEquals(1, jobMetrics.get("duration").size());
+ Assertions.assertEquals(99L,
jobMetrics.get("duration").get(0).statistic().value().value());
+ } finally {
+ provider.close();
+ }
+ }
+}
diff --git
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 4cc9447c73..5048a10cb0 100644
---
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -19,6 +19,6 @@
org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest