This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 77cd4762f7 [#9984] feat(optimizer): add monitor implementation 
providers and evaluator (#10104)
77cd4762f7 is described below

commit 77cd4762f7c03e55ec134f65c780eb04ebb3b9dd
Author: FANNG <[email protected]>
AuthorDate: Mon Mar 2 12:47:13 2026 +0900

    [#9984] feat(optimizer): add monitor implementation providers and evaluator 
(#10104)
    
    ## What changed are added
    
      ### 1. Added monitor implementations
    
      - Added GravitinoMetricsEvaluator
          - Rule-based evaluation with aggregations: max, min, avg, latest
          - Scoped rules: table.<metric> and job.<metric>
          - Partition scope is evaluated using table-scoped rules
      - Added GravitinoMetricsProvider
      - Added local file-based table-job relation provider:
          - LocalTableJobRelationProvider
          - LocalTableJobRelationReader for JSON-lines table -> jobs mapping
    
      ### 2. Service discovery wiring
    
    - Updated META-INF/services registrations for monitor provider/evaluator
        loading (main and test resources)
    
      ### 3. Behavior and robustness improvements
    
      - Extracted evaluator rule parsing into dedicated initialization logic
    - Better validation and error messages for local relation file
    configuration/
        path
      - Added explicit initialization guards for provider usage
    - Improved read failure diagnostics to include file path and target
    table
        identifier
      - Logging polish:
          - Evaluator per-metric skip logs lowered to DEBUG
          - Keep evaluator summary at INFO
    - Removed duplicated utility-level parse WARN logs from IdentifierUtils
            (callers log context)
    
      ### 4. Naming/semantic cleanup
    
    - Renamed monitor job-provider abstractions to table-job relation
    terminology:
          - JobProvider -> TableJobRelationProvider
          - LocalJobProvider -> LocalTableJobRelationProvider
          - FileJobReader -> LocalTableJobRelationReader
          - DummyJobProvider -> DummyTableJobRelationProvider
    - Updated related method/field names
    (createTableJobRelationProviderInstance,
        etc.)
    
      ## Why this change is needed
    
      - Makes monitor module production-usable instead of interface-only
      - Provides configurable, scope-aware, rule-based evaluation
    - Improves diagnostics and runtime safety for local file-based relation
        loading
      - Reduces log noise and duplicate warnings
    - Clarifies semantics for future extensibility of table-job relation
    sources
    
    Fixes: #9984
    
      ## User-facing changes
    
      Yes.
    
      ### New/updated provider/evaluator names
    
      - gravitino-metrics-evaluator
      - gravitino-metrics-provider
      - local-table-job-relation-provider (renamed)
    
      ### Rule config
    
      - optimizer.monitor.gravitino-metrics-evaluator.rules
    
      ### Property key update (renamed)
    
    - from: optimizer.monitor.local-job-provider.file-path (or old
    file-provider
        variant)
      - to: optimizer.monitor.local-table-job-relation-provider.filePath
    
      ### Rule semantics
    
      - Aggregations: max, min, avg, latest
      - Scopes: table, job
      - Partition metrics use table rules
    
      ## Testing
    
      Executed Spotless and targeted optimizer tests for evaluator/provider/
      relation-provider/monitor flows.
    
      ./gradlew :maintenance:optimizer:spotlessApply
    
      IT_SPARK_HOME=/Users/fanng/deploy/demo/spark-3.5.3-bin-hadoop3 \
    IT_SPARK_ARGS='--conf
    spark.jars=/Users/fanng/deploy/demo/jars/iceberg-spark-
    
    runtime-3.5_2.12-1.9.2.jar,/Users/fanng/deploy/demo/jars/iceberg-aws-bundle-
      1.9.2.jar --conf
    
    
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionEx
    tensions --conf
    spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog
      --conf spark.sql.catalog.rest.type=rest --conf
      spark.sql.catalog.rest.uri=http://127.0.0.1:9001/iceberg/ --conf
    
    
spark.sql.catalog.rest.header.X-Iceberg-Access-Delegation=vended-credentials'
      \
      IT_TABLE_IDENTIFIER=rest.ab.a1 \
      GRAVITINO_ENV_IT=true \
      ./gradlew :maintenance:optimizer:test \
    --tests "org.apache.gravitino.maintenance.optimizer.monitor.TestMonitor"
    \
        --tests
    
    
"org.apache.gravitino.maintenance.optimizer.monitor.evaluator.TestGravitinoMet
      ricsEvaluator" \
        --tests
    
    
"org.apache.gravitino.maintenance.optimizer.monitor.metrics.TestGravitinoMetri
      csProvider" \
        --tests
    
    
"org.apache.gravitino.maintenance.optimizer.monitor.job.TestLocalTableJobRelat
      ionProvider" \
        -PskipIT=true
---
 .../optimizer/api/monitor/MetricsEvaluator.java    |   9 +
 ...Provider.java => TableJobRelationProvider.java} |   2 +-
 .../optimizer/common/conf/OptimizerConfig.java     |  22 +-
 .../optimizer/common/util/IdentifierUtils.java     |  63 ++-
 .../optimizer/common/util/ProviderUtils.java       |   6 +-
 .../maintenance/optimizer/monitor/Monitor.java     |  25 +-
 .../monitor/callback/ConsoleMonitorCallback.java   |  92 ++++
 .../evaluator/GravitinoMetricsEvaluator.java       | 500 +++++++++++++++++++++
 .../job/dummy/DummyTableJobRelationProvider.java}  |  35 +-
 .../job/local/LocalTableJobRelationProvider.java   | 108 +++++
 .../job/local/LocalTableJobRelationReader.java     | 140 ++++++
 .../monitor/metrics/GravitinoMetricsProvider.java  | 149 ++++++
 .../local/AbstractStatisticsImporter.java          |  76 ++--
 ...itino.maintenance.optimizer.api.common.Provider |   4 +
 ...tenance.optimizer.api.monitor.MetricsEvaluator} |   6 +-
 .../optimizer/common/util/TestIdentifierUtils.java |  63 +++
 .../optimizer/common/util/TestProviderUtils.java   |  21 +-
 .../maintenance/optimizer/monitor/TestMonitor.java |  50 ++-
 .../monitor/evaluator/MetricsEvaluatorForTest.java |   4 +-
 .../evaluator/TestGravitinoMetricsEvaluator.java   | 460 +++++++++++++++++++
 ...t.java => TableJobRelationProviderForTest.java} |   6 +-
 .../job/TestLocalTableJobRelationProvider.java     | 102 +++++
 .../monitor/metrics/MetricsProviderForTest.java    |   6 +-
 .../metrics/TestGravitinoMetricsProvider.java      | 122 +++++
 ...itino.maintenance.optimizer.api.common.Provider |   2 +-
 25 files changed, 1968 insertions(+), 105 deletions(-)

diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
index f383ef6169..166222da7a 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/MetricsEvaluator.java
@@ -23,15 +23,24 @@ import java.util.List;
 import java.util.Map;
 import org.apache.gravitino.annotation.DeveloperApi;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 
 /**
  * Evaluator interface for the table and related job metrics before and after 
optimization actions.
  */
 @DeveloperApi
 public interface MetricsEvaluator {
+
   /** Human-readable evaluator name, primarily for logging and selection. */
   String name();
 
+  /**
+   * Optional initialization hook for evaluators that need runtime 
configuration.
+   *
+   * @param optimizerEnv shared optimizer environment/configuration
+   */
+  default void initialize(OptimizerEnv optimizerEnv) {}
+
   /**
    * Evaluate metrics before/after optimization to decide success/failure.
    *
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
similarity index 95%
rename from 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
rename to 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
index 8b4e825582..3fc60d2bfd 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/JobProvider.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/api/monitor/TableJobRelationProvider.java
@@ -26,7 +26,7 @@ import 
org.apache.gravitino.maintenance.optimizer.api.common.Provider;
 
 /** Represents a provider that provides upstream and downstream jobs for a 
table. */
 @DeveloperApi
-public interface JobProvider extends Provider {
+public interface TableJobRelationProvider extends Provider {
   /**
    * List jobs related to the provided table.
    *
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
index ee22f46ead..5d64378ff2 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/conf/OptimizerConfig.java
@@ -26,6 +26,9 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.statistics.GravitinoStatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.strategy.GravitinoStrategyProvider;
@@ -58,9 +61,10 @@ public class OptimizerConfig extends Config {
   public static final String UPDATER_PREFIX = OPTIMIZER_PREFIX + "updater.";
   private static final String STATISTICS_UPDATER = UPDATER_PREFIX + 
"statisticsUpdater";
   private static final String METRICS_UPDATER = UPDATER_PREFIX + 
"metricsUpdater";
-  private static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
+  public static final String MONITOR_PREFIX = OPTIMIZER_PREFIX + "monitor.";
   private static final String METRICS_PROVIDER = MONITOR_PREFIX + 
"metricsProvider";
-  private static final String JOB_PROVIDER = MONITOR_PREFIX + "jobProvider";
+  private static final String TABLE_JOB_RELATION_PROVIDER =
+      MONITOR_PREFIX + "tableJobRelationProvider";
   private static final String METRICS_EVALUATOR = MONITOR_PREFIX + 
"metricsEvaluator";
   private static final String MONITOR_CALLBACKS = MONITOR_PREFIX + "callbacks";
 
@@ -129,16 +133,16 @@ public class OptimizerConfig extends Config {
                   + "discoverable via ServiceLoader. Example: 
'metrics-provider'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .create();
+          .createWithDefault(GravitinoMetricsProvider.NAME);
 
-  public static final ConfigEntry<String> JOB_PROVIDER_CONFIG =
-      new ConfigBuilder(JOB_PROVIDER)
+  public static final ConfigEntry<String> TABLE_JOB_RELATION_PROVIDER_CONFIG =
+      new ConfigBuilder(TABLE_JOB_RELATION_PROVIDER)
           .doc(
-              "Monitor job provider implementation name (matches 
Provider.name()) discoverable "
-                  + "via ServiceLoader. Example: 'job-provider'.")
+              "Monitor table-job relation provider implementation name 
(matches Provider.name()) "
+                  + "discoverable via ServiceLoader. Example: 
'table-job-relation-provider'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .create();
+          .createWithDefault(DummyTableJobRelationProvider.NAME);
 
   public static final ConfigEntry<String> METRICS_EVALUATOR_CONFIG =
       new ConfigBuilder(METRICS_EVALUATOR)
@@ -148,7 +152,7 @@ public class OptimizerConfig extends Config {
                   + "'metrics-evaluator'.")
           .version(ConfigConstants.VERSION_1_2_0)
           .stringConf()
-          .create();
+          .createWithDefault(GravitinoMetricsEvaluator.NAME);
 
   public static final ConfigEntry<List<String>> MONITOR_CALLBACKS_CONFIG =
       new ConfigBuilder(MONITOR_CALLBACKS)
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
index 415e3b5634..761db7c891 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/IdentifierUtils.java
@@ -20,12 +20,13 @@
 package org.apache.gravitino.maintenance.optimizer.common.util;
 
 import com.google.common.base.Preconditions;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 
 /** Utilities for working with fully qualified table identifiers. */
 public class IdentifierUtils {
-
   private static final String NORMALIZED_IDENTIFIER_MESSAGE =
       "Identifier must be catalog.schema.table";
 
@@ -71,4 +72,64 @@ public class IdentifierUtils {
     Preconditions.checkArgument(
         namespace != null && namespace.levels().length == 2, 
NORMALIZED_IDENTIFIER_MESSAGE);
   }
+
+  /**
+   * Parse table identifier text.
+   *
+   * <p>Accepted forms:
+   *
+   * <ul>
+   *   <li>{@code schema.table}, when default catalog is configured
+   *   <li>{@code catalog.schema.table}
+   * </ul>
+   *
+   * @param identifierText raw identifier text
+   * @param defaultCatalogName optional default catalog for {@code 
schema.table}
+   * @return parsed identifier
+   */
+  public static Optional<NameIdentifier> parseTableIdentifier(
+      String identifierText, String defaultCatalogName) {
+    if (StringUtils.isBlank(identifierText)) {
+      return Optional.empty();
+    }
+
+    try {
+      NameIdentifier parsed = NameIdentifier.parse(identifierText);
+      int levels = parsed.namespace().levels().length;
+      if (levels == 0) {
+        return Optional.empty();
+      }
+      if (levels == 1) {
+        if (StringUtils.isNotBlank(defaultCatalogName)) {
+          return Optional.of(
+              NameIdentifier.of(defaultCatalogName, 
parsed.namespace().levels()[0], parsed.name()));
+        }
+        return Optional.empty();
+      }
+      if (levels == 2) {
+        return Optional.of(parsed);
+      }
+      return Optional.empty();
+    } catch (Exception e) {
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Parse job identifier text.
+   *
+   * @param identifierText raw identifier text
+   * @return parsed identifier
+   */
+  public static Optional<NameIdentifier> parseJobIdentifier(String 
identifierText) {
+    if (StringUtils.isBlank(identifierText)) {
+      return Optional.empty();
+    }
+
+    try {
+      return Optional.of(NameIdentifier.parse(identifierText));
+    } catch (Exception e) {
+      return Optional.empty();
+    }
+  }
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
index bd63e389b0..71b7d36e04 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/common/util/ProviderUtils.java
@@ -25,9 +25,9 @@ import java.util.List;
 import java.util.ServiceLoader;
 import java.util.stream.Collectors;
 import org.apache.gravitino.maintenance.optimizer.api.common.Provider;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import 
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
 import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
@@ -107,8 +107,8 @@ public class ProviderUtils {
     return createProviderInstance(MetricsProvider.class, provider);
   }
 
-  public static JobProvider createJobProviderInstance(String provider) {
-    return createProviderInstance(JobProvider.class, provider);
+  public static TableJobRelationProvider 
createTableJobRelationProviderInstance(String provider) {
+    return createProviderInstance(TableJobRelationProvider.class, provider);
   }
 
   public static MonitorCallback createMonitorCallbackInstance(String provider) 
{
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
index ebf88358f0..5f01615bb2 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/Monitor.java
@@ -31,11 +31,11 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
 import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import 
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
 import org.apache.gravitino.maintenance.optimizer.common.CloseableGroup;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
@@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory;
  *
  * <ul>
  *   <li>{@link OptimizerConfig#METRICS_PROVIDER_CONFIG} for {@link 
MetricsProvider}.
- *   <li>{@link OptimizerConfig#JOB_PROVIDER_CONFIG} for {@link JobProvider}.
+ *   <li>{@link OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} for {@link
+ *       TableJobRelationProvider}.
  *   <li>{@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} for {@link 
MetricsEvaluator}.
  *   <li>{@link OptimizerConfig#MONITOR_CALLBACKS_CONFIG} for callback list.
  * </ul>
@@ -82,7 +83,7 @@ import org.slf4j.LoggerFactory;
  *   <li>Resolve a time range: {@code [actionTimeSeconds - rangeSeconds, 
actionTimeSeconds +
  *       rangeSeconds]}.
  *   <li>Read table/partition metrics and evaluate them.
- *   <li>Resolve related jobs from {@link JobProvider} and evaluate each job's 
metrics.
+ *   <li>Resolve related jobs from {@link TableJobRelationProvider} and 
evaluate each job's metrics.
  *   <li>Return ordered results (table first, then jobs).
  * </ol>
  */
@@ -90,7 +91,7 @@ public class Monitor implements AutoCloseable {
 
   private static final Logger LOG = LoggerFactory.getLogger(Monitor.class);
   private final MetricsProvider metricsProvider;
-  private final JobProvider jobProvider;
+  private final TableJobRelationProvider tableJobRelationProvider;
   private final MetricsEvaluator metricsEvaluator;
   private final List<MonitorCallback> callbacks;
   private final CloseableGroup closeableGroup = new CloseableGroup();
@@ -107,11 +108,13 @@ public class Monitor implements AutoCloseable {
     metricsProvider.initialize(optimizerEnv);
     closeableGroup.register(metricsProvider, 
MetricsProvider.class.getSimpleName());
 
-    this.jobProvider = loadJobProvider(optimizerEnv.config());
-    jobProvider.initialize(optimizerEnv);
-    closeableGroup.register(jobProvider, JobProvider.class.getSimpleName());
+    this.tableJobRelationProvider = 
loadTableJobRelationProvider(optimizerEnv.config());
+    tableJobRelationProvider.initialize(optimizerEnv);
+    closeableGroup.register(
+        tableJobRelationProvider, 
TableJobRelationProvider.class.getSimpleName());
 
     this.metricsEvaluator = loadMetricsEvaluator(optimizerEnv.config());
+    metricsEvaluator.initialize(optimizerEnv);
     this.callbacks = loadCallbacks(optimizerEnv.config());
     for (MonitorCallback callback : callbacks) {
       callback.initialize(optimizerEnv);
@@ -143,7 +146,7 @@ public class Monitor implements AutoCloseable {
       results.add(
           evaluateTableMetrics(
               metricsEvaluator, tableIdentifier, actionTimeSeconds, 
rangeSeconds, partitionPath));
-      List<NameIdentifier> jobs = jobProvider.jobIdentifiers(tableIdentifier);
+      List<NameIdentifier> jobs = 
tableJobRelationProvider.jobIdentifiers(tableIdentifier);
       if (jobs == null) {
         jobs = List.of();
       }
@@ -269,9 +272,9 @@ public class Monitor implements AutoCloseable {
         optimizerConfig.get(OptimizerConfig.METRICS_PROVIDER_CONFIG));
   }
 
-  private JobProvider loadJobProvider(OptimizerConfig optimizerConfig) {
-    return ProviderUtils.createJobProviderInstance(
-        optimizerConfig.get(OptimizerConfig.JOB_PROVIDER_CONFIG));
+  private TableJobRelationProvider 
loadTableJobRelationProvider(OptimizerConfig optimizerConfig) {
+    return ProviderUtils.createTableJobRelationProviderInstance(
+        
optimizerConfig.get(OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG));
   }
 
   private MetricsEvaluator loadMetricsEvaluator(OptimizerConfig 
optimizerConfig) {
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
new file mode 100644
index 0000000000..2bb2af4acd
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/callback/ConsoleMonitorCallback.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.callback;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.EvaluationResult;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MonitorCallback;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+
+/** Built-in callback that prints evaluation results and metric summaries to 
console. */
+public class ConsoleMonitorCallback implements MonitorCallback {
+
+  public static final String NAME = "console";
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {}
+
+  @Override
+  public void onEvaluation(EvaluationResult result) {
+    System.out.println(
+        String.format(
+            "MONITOR: time=%s scope=%s identifier=%s%s evaluation=%s 
evaluator=%s",
+            Instant.now(),
+            result.scope().type(),
+            result.scope().identifier(),
+            result.scope().partition().isPresent()
+                ? " partition=" + result.scope().partition().get()
+                : "",
+            result.evaluation(),
+            result.evaluatorName()));
+    if (!result.beforeMetrics().isEmpty()) {
+      System.out.println("METRICS BEFORE: " + 
formatMetrics(result.beforeMetrics()));
+    }
+    if (!result.afterMetrics().isEmpty()) {
+      System.out.println("METRICS AFTER: " + 
formatMetrics(result.afterMetrics()));
+    }
+  }
+
+  @Override
+  public void close() throws Exception {}
+
+  private String formatMetrics(Map<String, List<MetricSample>> metrics) {
+    if (metrics == null || metrics.isEmpty()) {
+      return "{}";
+    }
+    return metrics.entrySet().stream()
+        .map(entry -> entry.getKey() + "=" + 
formatMetricSamples(entry.getValue()))
+        .collect(Collectors.joining(", ", "{", "}"));
+  }
+
+  private String formatMetricSamples(List<MetricSample> samples) {
+    if (samples == null || samples.isEmpty()) {
+      return "[]";
+    }
+    return samples.stream()
+        .map(sample -> sample.timestamp() + ":" + formatMetricValue(sample))
+        .collect(Collectors.joining(", ", "[", "]"));
+  }
+
+  private String formatMetricValue(MetricSample sample) {
+    if (sample == null || sample.statistic() == null || 
sample.statistic().value() == null) {
+      return "N/A";
+    }
+    return String.valueOf(sample.statistic().value().value());
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
new file mode 100644
index 0000000000..6c5798108f
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/GravitinoMetricsEvaluator.java
@@ -0,0 +1,500 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalDouble;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Built-in {@link MetricsEvaluator} for comparing before/after metrics with 
configurable rules.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li>Set {@link OptimizerConfig#METRICS_EVALUATOR_CONFIG} to {@value 
#NAME}.
+ *   <li>Configure {@link #EVALUATION_RULES_CONFIG} with comma-separated rule 
entries.
+ * </ul>
+ *
+ * <p>Rule format:
+ *
+ * <ul>
+ *   <li>{@code scope:metricName:aggregation:comparison}
+ *   <li>{@code scope}: {@code table|job}
+ *   <li>{@code aggregation}: {@code max|min|avg|latest}
+ *   <li>{@code comparison}: {@code lt|le|gt|ge|eq|ne}
+ * </ul>
+ *
+ * <p>Evaluation semantics:
+ *
+ * <ul>
+ *   <li>Each rule selects one metric and computes aggregated {@code before} 
and {@code after}
+ *       values.
+ *   <li>The configured comparison is applied as {@code compare(after, 
before)}.
+ *   <li>Partition scopes reuse table rules.
+ *   <li>Rules with insufficient/non-aggregatable samples are skipped.
+ *   <li>The overall result is pass when no rule fails.
+ * </ul>
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * gravitino.optimizer.monitor.metricsEvaluator = gravitino-metrics-evaluator
+ * gravitino.optimizer.monitor.gravitinoMetricsEvaluator.rules =
+ *   table:row_count:avg:le,job:duration:latest:le
+ * }</pre>
+ */
+public class GravitinoMetricsEvaluator implements MetricsEvaluator {
+
+  public static final String NAME = "gravitino-metrics-evaluator";
+  public static final String CONFIG_NAME = "gravitinoMetricsEvaluator";
+
+  /**
+   * Comma-separated evaluation rules in format: {@code 
scope:metricName:aggregation:comparison}.
+   * Supported scope values are {@code table|job}. Supported aggregation 
values are {@code
+   * max|min|avg|latest}. Supported comparison values are {@code 
lt|le|gt|ge|eq|ne}. Table rules are
+   * applied to both table and partition metric scopes.
+   */
+  public static final String EVALUATION_RULES_CONFIG =
+      OptimizerConfig.MONITOR_PREFIX + CONFIG_NAME + ".rules";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoMetricsEvaluator.class);
+
+  private Map<RuleScope, Map<String, RuleConfig>> metricRulesByScope = 
Map.of();
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {
+    String rawRules = 
optimizerEnv.config().getRawString(EVALUATION_RULES_CONFIG);
+    this.metricRulesByScope = parseEvaluationRules(rawRules);
+    if (metricRulesByScope.isEmpty()) {
+      LOG.warn(
+          "No evaluator rules configured in {}. All evaluations will pass 
until rules are set.",
+          EVALUATION_RULES_CONFIG);
+      return;
+    }
+    LOG.info(
+        "Loaded {} scoped metric evaluation rules for {}: {}",
+        ruleCount(metricRulesByScope),
+        NAME,
+        formatRules(metricRulesByScope));
+  }
+
+  /**
+   * Parse evaluator rules from raw config text.
+   *
+   * <p>Rule format: {@code scope:metricName:aggregation:comparison}, 
separated by commas.
+   *
+   * @param rawRules raw rule text from config
+   * @return immutable scoped rule map, or empty map when input is blank
+   */
+  static Map<RuleScope, Map<String, RuleConfig>> parseEvaluationRules(String 
rawRules) {
+    if (StringUtils.isBlank(rawRules)) {
+      return Map.of();
+    }
+
+    Map<RuleScope, Map<String, RuleConfig>> parsedRules = new 
LinkedHashMap<>();
+    String[] entries = rawRules.split(",");
+    for (String entry : entries) {
+      String trimmed = entry == null ? "" : entry.trim();
+      if (trimmed.isEmpty()) {
+        continue;
+      }
+
+      String[] parts = trimmed.split(":", -1);
+      if (parts.length != 4) {
+        throw new IllegalArgumentException(
+            "Invalid evaluator rule '"
+                + trimmed
+                + "'. Expected format is 
scope:metricName:aggregation:comparison");
+      }
+
+      String scopeText = parts[0].trim();
+      String metricText = parts[1].trim();
+      String aggregationText = parts[2].trim();
+      String comparisonText = parts[3].trim();
+
+      RuleScope scope = RuleScope.fromConfigValue(scopeText);
+      String metricName = normalizeMetricName(metricText);
+      if (metricName.isEmpty()) {
+        throw new IllegalArgumentException("Metric name must not be blank in 
rule: " + trimmed);
+      }
+      AggregationOp aggregation = AggregationOp.from(aggregationText);
+      ComparisonOp comparison = ComparisonOp.from(comparisonText);
+
+      Map<String, RuleConfig> scopeRules =
+          parsedRules.computeIfAbsent(scope, ignored -> new LinkedHashMap<>());
+      RuleConfig previous =
+          scopeRules.putIfAbsent(metricName, new RuleConfig(aggregation, 
comparison));
+      if (previous != null) {
+        throw new IllegalArgumentException(
+            "Duplicate metric rule for '"
+                + scope.configValue
+                + ":"
+                + metricName
+                + "' in "
+                + EVALUATION_RULES_CONFIG);
+      }
+    }
+
+    if (parsedRules.isEmpty()) {
+      return Map.of();
+    }
+
+    Map<RuleScope, Map<String, RuleConfig>> immutableRules = new 
LinkedHashMap<>();
+    for (Map.Entry<RuleScope, Map<String, RuleConfig>> entry : 
parsedRules.entrySet()) {
+      immutableRules.put(entry.getKey(), 
Collections.unmodifiableMap(entry.getValue()));
+    }
+    return Collections.unmodifiableMap(immutableRules);
+  }
+
+  @Override
+  public boolean evaluateMetrics(
+      MetricScope scope,
+      Map<String, List<MetricSample>> beforeMetrics,
+      Map<String, List<MetricSample>> afterMetrics) {
+    if (metricRulesByScope.isEmpty()) {
+      return true;
+    }
+
+    RuleScope ruleScope = RuleScope.fromMetricScopeType(scope.type());
+    Map<String, RuleConfig> rules = metricRulesByScope.getOrDefault(ruleScope, 
Map.of());
+    if (rules.isEmpty()) {
+      LOG.info(
+          "No evaluator rules configured for scope={}, identifier={}, skip 
evaluation",
+          scope.type(),
+          scope.identifier());
+      return true;
+    }
+
+    Map<String, List<MetricSample>> before = beforeMetrics == null ? Map.of() 
: beforeMetrics;
+    Map<String, List<MetricSample>> after = afterMetrics == null ? Map.of() : 
afterMetrics;
+
+    int evaluatedCount = 0;
+    int failedCount = 0;
+    int skippedCount = 0;
+    for (Map.Entry<String, RuleConfig> rule : rules.entrySet()) {
+      evaluatedCount++;
+      String metricName = rule.getKey();
+      RuleConfig ruleConfig = rule.getValue();
+      AggregationOp aggregation = ruleConfig.aggregation;
+      ComparisonOp comparison = ruleConfig.comparison;
+
+      List<MetricSample> beforeSamples = 
sanitizeSamples(findMetricSamples(before, metricName));
+      List<MetricSample> afterSamples = 
sanitizeSamples(findMetricSamples(after, metricName));
+      if (beforeSamples.isEmpty() || afterSamples.isEmpty()) {
+        LOG.debug(
+            "Metric {} of {} ({}) has insufficient samples: beforeSize={}, 
afterSize={}",
+            metricName,
+            scope.identifier(),
+            scope.type(),
+            beforeSamples.size(),
+            afterSamples.size());
+        skippedCount++;
+        continue;
+      }
+
+      OptionalDouble beforeValue = aggregate(beforeSamples, aggregation);
+      OptionalDouble afterValue = aggregate(afterSamples, aggregation);
+      if (beforeValue.isEmpty() || afterValue.isEmpty()) {
+        LOG.debug(
+            "Metric {} of {} ({}) cannot be aggregated by {}",
+            metricName,
+            scope.identifier(),
+            scope.type(),
+            aggregation.configValue);
+        skippedCount++;
+        continue;
+      }
+
+      double beforeNumber = beforeValue.getAsDouble();
+      double afterNumber = afterValue.getAsDouble();
+      boolean passed = comparison.test(afterNumber, beforeNumber);
+      LOG.debug(
+          "Metric {} of {} ({}) using {}:{}: before={}, after={}, passed={}",
+          metricName,
+          scope.identifier(),
+          scope.type(),
+          aggregation.configValue,
+          comparison.configValue,
+          beforeNumber,
+          afterNumber,
+          passed);
+      if (!passed) {
+        failedCount++;
+      }
+    }
+
+    boolean allPassed = failedCount == 0;
+    LOG.info(
+        "Evaluated {} metrics for {} ({}), passed={}, failed={}, skipped={}",
+        evaluatedCount,
+        scope.identifier(),
+        scope.type(),
+        allPassed,
+        failedCount,
+        skippedCount);
+    return allPassed;
+  }
+
+  private static int ruleCount(Map<RuleScope, Map<String, RuleConfig>> 
rulesByScope) {
+    return rulesByScope.values().stream().mapToInt(Map::size).sum();
+  }
+
+  private static String formatRules(Map<RuleScope, Map<String, RuleConfig>> 
rulesByScope) {
+    return rulesByScope.entrySet().stream()
+        .flatMap(
+            scopeEntry ->
+                scopeEntry.getValue().entrySet().stream()
+                    .map(
+                        metricEntry ->
+                            scopeEntry.getKey().configValue
+                                + ":"
+                                + metricEntry.getKey()
+                                + ":"
+                                + 
metricEntry.getValue().aggregation.configValue
+                                + ":"
+                                + 
metricEntry.getValue().comparison.configValue))
+        .collect(Collectors.joining(", "));
+  }
+
+  private static List<MetricSample> sanitizeSamples(List<MetricSample> 
samples) {
+    return samples == null ? List.of() : samples;
+  }
+
+  private static List<MetricSample> findMetricSamples(
+      Map<String, List<MetricSample>> metrics, String normalizedMetricName) {
+    if (metrics == null || metrics.isEmpty()) {
+      return List.of();
+    }
+    List<MetricSample> exactMatch = metrics.get(normalizedMetricName);
+    if (exactMatch != null) {
+      return exactMatch;
+    }
+    for (Map.Entry<String, List<MetricSample>> entry : metrics.entrySet()) {
+      if (normalizeMetricName(entry.getKey()).equals(normalizedMetricName)) {
+        return entry.getValue();
+      }
+    }
+    return List.of();
+  }
+
+  private static String normalizeMetricName(String metricName) {
+    return metricName == null ? "" : 
metricName.trim().toLowerCase(Locale.ROOT);
+  }
+
+  private static OptionalDouble aggregate(List<MetricSample> metrics, 
AggregationOp operation) {
+    try {
+      return switch (operation) {
+        case MAX -> metrics.stream()
+            .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+            .max();
+        case MIN -> metrics.stream()
+            .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+            .min();
+        case AVG -> metrics.stream()
+            .mapToDouble(GravitinoMetricsEvaluator::metricNumericValue)
+            .average();
+        case LATEST -> latest(metrics);
+      };
+    } catch (Exception e) {
+      return OptionalDouble.empty();
+    }
+  }
+
+  private static OptionalDouble latest(List<MetricSample> metrics) {
+    Optional<MetricSample> latest =
+        
metrics.stream().max(Comparator.comparingLong(MetricSample::timestamp));
+    return latest
+        .map(sample -> OptionalDouble.of(metricNumericValue(sample)))
+        .orElseGet(OptionalDouble::empty);
+  }
+
+  private static double metricNumericValue(MetricSample metric) {
+    Object value = metric.statistic().value().value();
+    if (value instanceof Number) {
+      return ((Number) value).doubleValue();
+    }
+    throw new IllegalArgumentException(
+        "Metric value must be numeric, but got "
+            + (value == null ? "null" : value.getClass().getName()));
+  }
+
+  private enum RuleScope {
+    TABLE("table"),
+    JOB("job");
+
+    private final String configValue;
+
+    RuleScope(String configValue) {
+      this.configValue = configValue;
+    }
+
+    private static RuleScope fromConfigValue(String value) {
+      String normalized = value == null ? "" : 
value.trim().toLowerCase(Locale.ROOT);
+      for (RuleScope scope : values()) {
+        if (scope.configValue.equals(normalized)) {
+          return scope;
+        }
+      }
+      throw new IllegalArgumentException(
+          "Unsupported rule scope '"
+              + value
+              + "'. Supported scopes are: "
+              + RuleScope.supportedValues());
+    }
+
+    private static RuleScope fromMetricScopeType(MetricScope.Type type) {
+      return switch (type) {
+        case TABLE -> TABLE;
+        case PARTITION -> TABLE;
+        case JOB -> JOB;
+      };
+    }
+
+    private static String supportedValues() {
+      return Arrays.stream(values())
+          .map(scope -> scope.configValue)
+          .collect(Collectors.joining("|"));
+    }
+  }
+
+  private enum AggregationOp {
+    MAX("max"),
+    MIN("min"),
+    AVG("avg"),
+    LATEST("latest");
+
+    private final String configValue;
+
+    AggregationOp(String configValue) {
+      this.configValue = configValue;
+    }
+
+    private static AggregationOp from(String value) {
+      String normalized = value == null ? "" : 
value.trim().toLowerCase(Locale.ROOT);
+      for (AggregationOp op : values()) {
+        if (op.configValue.equals(normalized)) {
+          return op;
+        }
+      }
+      throw new IllegalArgumentException(
+          "Unsupported aggregation operation '"
+              + value
+              + "'. Supported operations are: "
+              + supportedValues());
+    }
+
+    private static String supportedValues() {
+      return Arrays.stream(values()).map(op -> 
op.configValue).collect(Collectors.joining("|"));
+    }
+  }
+
+  /**
+   * Comparison operation used to verify aggregated {@code after} value 
against aggregated {@code
+   * before} value.
+   */
+  private enum ComparisonOp {
+    LT("lt"),
+    LE("le"),
+    GT("gt"),
+    GE("ge"),
+    EQ("eq"),
+    NE("ne");
+
+    private static final double EPSILON = 1e-9;
+    private final String configValue;
+
+    ComparisonOp(String configValue) {
+      this.configValue = configValue;
+    }
+
+    /**
+     * Apply this comparison to evaluated numbers.
+     *
+     * @param left evaluated "after" value
+     * @param right evaluated "before" value
+     * @return true if the comparison passes
+     */
+    private boolean test(double left, double right) {
+      return switch (this) {
+        case LT -> left < right;
+        case LE -> left <= right;
+        case GT -> left > right;
+        case GE -> left >= right;
+        case EQ -> Math.abs(left - right) <= EPSILON;
+        case NE -> Math.abs(left - right) > EPSILON;
+      };
+    }
+
+    private static ComparisonOp from(String value) {
+      String normalized = value == null ? "" : 
value.trim().toLowerCase(Locale.ROOT);
+      for (ComparisonOp op : values()) {
+        if (op.configValue.equals(normalized)) {
+          return op;
+        }
+      }
+      throw new IllegalArgumentException(
+          "Unsupported comparison operation '"
+              + value
+              + "'. Supported comparisons are: "
+              + supportedValues());
+    }
+
+    private static String supportedValues() {
+      return Arrays.stream(values()).map(op -> 
op.configValue).collect(Collectors.joining("|"));
+    }
+  }
+
+  /** Parsed rule definition for one metric, including aggregation and 
comparison semantics. */
+  private static class RuleConfig {
+    private final AggregationOp aggregation;
+    private final ComparisonOp comparison;
+
+    private RuleConfig(AggregationOp aggregation, ComparisonOp comparison) {
+      this.aggregation = aggregation;
+      this.comparison = comparison;
+    }
+
+    @Override
+    public String toString() {
+      return aggregation.name() + ":" + comparison.name();
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
similarity index 60%
copy from 
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
copy to 
maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
index 5ab2cebd61..38d9925961 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/dummy/DummyTableJobRelationProvider.java
@@ -17,18 +17,36 @@
  * under the License.
  */
 
-package org.apache.gravitino.maintenance.optimizer.monitor.job;
+package org.apache.gravitino.maintenance.optimizer.monitor.job.dummy;
 
 import java.util.List;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import 
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 
-public class JobProviderForTest implements JobProvider {
+/**
+ * No-op {@link TableJobRelationProvider} implementation.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li>Use this provider when you only want table/partition metric 
evaluation.
+ *   <li>No external configuration is required.
+ *   <li>Always returns an empty job identifier list.
+ * </ul>
+ *
+ * <p>Configured by setting {@link 
OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} to {@value
+ * #NAME}.
+ */
+public class DummyTableJobRelationProvider implements TableJobRelationProvider 
{
+
+  public static final String NAME = "dummy-table-job-relation-provider";
 
-  public static final String NAME = "job-provider-for-test";
-  public static final NameIdentifier JOB1 = 
NameIdentifier.parse("test.db.job1");
-  public static final NameIdentifier JOB2 = 
NameIdentifier.parse("test.db.job2");
+  @Override
+  public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+    return List.of();
+  }
 
   @Override
   public String name() {
@@ -38,11 +56,6 @@ public class JobProviderForTest implements JobProvider {
   @Override
   public void initialize(OptimizerEnv optimizerEnv) {}
 
-  @Override
-  public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
-    return List.of(JOB1, JOB2);
-  }
-
   @Override
   public void close() throws Exception {}
 }
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
new file mode 100644
index 0000000000..8571397a13
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job.local;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import 
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+
+/**
+ * A {@link TableJobRelationProvider} that reads table-to-jobs mappings from a 
local JSON-lines
+ * file.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li>Set {@link OptimizerConfig#TABLE_JOB_RELATION_PROVIDER_CONFIG} to 
{@value #NAME}.
+ *   <li>Set {@link #JOB_FILE_PATH_CONFIG} ({@code
+ *       gravitino.optimizer.monitor.localTableJobRelationProvider.filePath}) 
to a readable
+ *       JSON-lines file path.
+ *   <li>Optional: set {@link 
OptimizerConfig#GRAVITINO_DEFAULT_CATALOG_CONFIG} to resolve two-level
+ *       table identifiers ({@code schema.table}) into {@code 
<defaultCatalog>.schema.table}.
+ * </ul>
+ *
+ * <p>Each line in the job file is a JSON object:
+ *
+ * <pre>{@code
+ * {"identifier":"catalog.schema.table","job-identifiers":["job1","job2"]}
+ * {"identifier":"schema.table","job-identifiers":["job3"]}
+ * }</pre>
+ *
+ * <p>Format rules:
+ *
+ * <ul>
+ *   <li>{@code identifier}: table name, either {@code catalog.schema.table} 
or {@code schema.table}
+ *       (when default catalog is configured).
+ *   <li>{@code job-identifiers}: array of job identifiers, each parseable by 
{@link
+ *       NameIdentifier#parse(String)}.
+ *   <li>Malformed lines and invalid entries are skipped with warning logs.
+ * </ul>
+ */
+public class LocalTableJobRelationProvider implements TableJobRelationProvider 
{
+
+  public static final String NAME = "local-table-job-relation-provider";
+  public static final String CONFIG_NAME = "localTableJobRelationProvider";
+
+  /** Config key for the local JSON-lines job mapping file path. */
+  public static final String JOB_FILE_PATH_CONFIG =
+      OptimizerConfig.MONITOR_PREFIX + CONFIG_NAME + ".filePath";
+
+  private LocalTableJobRelationReader jobReader;
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {
+    String path = optimizerEnv.config().getRawString(JOB_FILE_PATH_CONFIG);
+    if (StringUtils.isBlank(path)) {
+      throw new IllegalArgumentException(
+          JOB_FILE_PATH_CONFIG + " must be provided for 
LocalTableJobRelationProvider");
+    }
+    Path jobFilePath = Path.of(path).toAbsolutePath().normalize();
+    if (!Files.exists(jobFilePath)
+        || !Files.isRegularFile(jobFilePath)
+        || !Files.isReadable(jobFilePath)) {
+      throw new IllegalArgumentException(
+          "Configured job file path is not a readable file: " + jobFilePath);
+    }
+    String defaultCatalog =
+        
optimizerEnv.config().get(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG);
+    this.jobReader = new LocalTableJobRelationReader(jobFilePath, 
defaultCatalog);
+  }
+
+  @Override
+  public List<NameIdentifier> jobIdentifiers(NameIdentifier tableIdentifier) {
+    if (jobReader == null) {
+      throw new IllegalStateException("LocalTableJobRelationProvider is not 
initialized");
+    }
+    return jobReader.readJobIdentifiers(tableIdentifier);
+  }
+
+  @Override
+  public void close() throws Exception {}
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
new file mode 100644
index 0000000000..0e01cb7ba3
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/job/local/LocalTableJobRelationReader.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job.local;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.json.JsonUtils;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Shared reader for file-based table-job relation providers. */
+class LocalTableJobRelationReader {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(LocalTableJobRelationReader.class);
+
+  private final Path jobFilePath;
+  private final String defaultCatalogName;
+
+  LocalTableJobRelationReader(Path jobFilePath, String defaultCatalogName) {
+    Preconditions.checkArgument(jobFilePath != null, "jobFilePath cannot be 
null");
+    this.jobFilePath = jobFilePath;
+    this.defaultCatalogName = defaultCatalogName;
+  }
+
+  List<NameIdentifier> readJobIdentifiers(NameIdentifier tableIdentifier) {
+    Set<NameIdentifier> jobs = new LinkedHashSet<>();
+
+    try (BufferedReader reader = Files.newBufferedReader(jobFilePath, 
StandardCharsets.UTF_8)) {
+      String line;
+      int lineNumber = 0;
+      while ((line = reader.readLine()) != null) {
+        lineNumber++;
+        if (StringUtils.isBlank(line)) {
+          continue;
+        }
+
+        JobMappingLine mappingLine = parseJsonLine(line, lineNumber);
+        if (mappingLine == null) {
+          continue;
+        }
+
+        Optional<NameIdentifier> identifier = 
parseIdentifier(mappingLine.identifier, lineNumber);
+        if (identifier.isEmpty() || !identifier.get().equals(tableIdentifier)) 
{
+          continue;
+        }
+
+        collectJobs(mappingLine.jobIdentifiers, jobs, lineNumber);
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(
+          String.format(
+              "Failed to read job mappings from file %s for table %s",
+              jobFilePath, tableIdentifier),
+          e);
+    }
+
+    return List.copyOf(jobs);
+  }
+
+  private JobMappingLine parseJsonLine(String line, int lineNumber) {
+    try {
+      return JsonUtils.anyFieldMapper().readValue(line, JobMappingLine.class);
+    } catch (IOException e) {
+      LOG.warn("Skip malformed job mapping at line {}: {}", lineNumber, line);
+      return null;
+    }
+  }
+
+  private Optional<NameIdentifier> parseIdentifier(String identifierText, int 
lineNumber) {
+    if (StringUtils.isBlank(identifierText)) {
+      return Optional.empty();
+    }
+
+    Optional<NameIdentifier> parsed =
+        IdentifierUtils.parseTableIdentifier(identifierText, 
defaultCatalogName);
+    if (parsed.isEmpty()) {
+      LOG.warn("Skip invalid table identifier at line {}: {}", lineNumber, 
identifierText);
+    }
+    return parsed;
+  }
+
+  private void collectJobs(List<Object> jobIdentifiers, Set<NameIdentifier> 
jobs, int lineNumber) {
+    if (jobIdentifiers == null || jobIdentifiers.isEmpty()) {
+      return;
+    }
+    for (Object jobIdentifier : jobIdentifiers) {
+      if (!(jobIdentifier instanceof String)) {
+        continue;
+      }
+      String jobText = (String) jobIdentifier;
+      if (StringUtils.isBlank(jobText)) {
+        continue;
+      }
+      try {
+        jobs.add(NameIdentifier.parse(jobText));
+      } catch (Exception e) {
+        LOG.warn("Skip invalid job identifier at line {}: {}", lineNumber, 
jobText);
+      }
+    }
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  private static class JobMappingLine {
+    @JsonProperty("identifier")
+    private String identifier;
+
+    @JsonProperty("job-identifiers")
+    private List<Object> jobIdentifiers;
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
new file mode 100644
index 0000000000..a7a1b5c56c
--- /dev/null
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/GravitinoMetricsProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import 
org.apache.gravitino.maintenance.optimizer.common.PartitionMetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricRecord;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+
+/**
+ * {@link MetricsProvider} implementation backed by Gravitino metric storage.
+ *
+ * <p>Usage:
+ *
+ * <ul>
+ *   <li>Set {@link 
org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig
+ *       #METRICS_PROVIDER_CONFIG} to {@value #NAME}.
+ *   <li>This provider initializes an internal {@link 
GenericJdbcMetricsRepository} and reads
+ *       metrics through {@link MetricsRepository} APIs.
+ * </ul>
+ *
+ * <p>Behavior:
+ *
+ * <ul>
+ *   <li>{@link #jobMetrics(NameIdentifier, long, long)} returns job metrics 
in the requested time
+ *       range.
+ *   <li>{@link #tableMetrics(NameIdentifier, long, long)} returns table 
metrics in the requested
+ *       time range.
+ *   <li>{@link #partitionMetrics(NameIdentifier, PartitionPath, long, long)} 
returns partition
+ *       metrics using encoded partition path.
+ *   <li>Storage records are converted to monitor-domain {@link MetricSample} 
values.
+ * </ul>
+ */
+public class GravitinoMetricsProvider implements MetricsProvider {
+
+  public static final String NAME = "gravitino-metrics-provider";
+  private MetricsRepository metricsRepository;
+
+  @Override
+  public String name() {
+    return NAME;
+  }
+
+  @Override
+  public void initialize(OptimizerEnv optimizerEnv) {
+    MetricsRepository repository = new GenericJdbcMetricsRepository();
+    repository.initialize(optimizerEnv.config().getAllConfig());
+    this.metricsRepository = repository;
+  }
+
+  @Override
+  public Map<String, List<MetricSample>> jobMetrics(
+      NameIdentifier jobIdentifier, long startTime, long endTime) {
+    ensureInitialized();
+    Map<String, List<MetricRecord>> metrics =
+        metricsRepository.getJobMetrics(jobIdentifier, startTime, endTime);
+
+    return toSingleMetrics(metrics, Optional.empty());
+  }
+
+  @Override
+  public Map<String, List<MetricSample>> tableMetrics(
+      NameIdentifier tableIdentifier, long startTime, long endTime) {
+    ensureInitialized();
+    Map<String, List<MetricRecord>> metrics =
+        metricsRepository.getTableMetrics(tableIdentifier, startTime, endTime);
+
+    return toSingleMetrics(metrics, Optional.empty());
+  }
+
+  @Override
+  public Map<String, List<MetricSample>> partitionMetrics(
+      NameIdentifier tableIdentifier, PartitionPath partitionPath, long 
startTime, long endTime) {
+    ensureInitialized();
+    Map<String, List<MetricRecord>> metrics =
+        metricsRepository.getPartitionMetrics(
+            tableIdentifier, 
PartitionUtils.encodePartitionPath(partitionPath), startTime, endTime);
+
+    return toSingleMetrics(metrics, Optional.of(partitionPath));
+  }
+
+  private Map<String, List<MetricSample>> toSingleMetrics(
+      Map<String, List<MetricRecord>> metrics, Optional<PartitionPath> 
partitionPath) {
+    return metrics.entrySet().stream()
+        .collect(
+            Collectors.toMap(
+                entry -> entry.getKey(),
+                entry ->
+                    entry.getValue().stream()
+                        .map(
+                            storageMetric ->
+                                toSingleMetric(storageMetric, partitionPath, 
entry.getKey()))
+                        .collect(Collectors.toList())));
+  }
+
+  private MetricSample toSingleMetric(
+      MetricRecord metric, Optional<PartitionPath> partitionPath, String 
metricName) {
+    StatisticEntry<?> statistic =
+        new StatisticEntryImpl<>(metricName, 
StatisticValueUtils.fromString(metric.getValue()));
+    return partitionPath
+        .<MetricSample>map(
+            partition -> new PartitionMetricSampleImpl(metric.getTimestamp(), 
statistic, partition))
+        .orElseGet(() -> new MetricSampleImpl(metric.getTimestamp(), 
statistic));
+  }
+
+  private void ensureInitialized() {
+    Preconditions.checkState(
+        metricsRepository != null, "GravitinoMetricsProvider is not 
initialized");
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (metricsRepository != null) {
+      metricsRepository.close();
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
index 0bb5a693b0..4b87faa118 100644
--- 
a/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
+++ 
b/maintenance/optimizer/src/main/java/org/apache/gravitino/maintenance/optimizer/updater/calculator/local/AbstractStatisticsImporter.java
@@ -44,6 +44,7 @@ import 
org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
 import 
org.apache.gravitino.maintenance.optimizer.api.common.TableAndPartitionStatistics;
 import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.util.IdentifierUtils;
 import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.stats.StatisticValues;
 import org.slf4j.Logger;
@@ -142,16 +143,16 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
         new StatisticsRecordVisitor() {
           @Override
           public void onTable(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseTableIdentifier(record.identifier());
-            if (identifier == null) {
+            Optional<NameIdentifier> identifier = 
parseTableIdentifier(record.identifier());
+            if (identifier.isEmpty()) {
               return;
             }
-            if (targetIdentifier != null && 
!targetIdentifier.equals(identifier)) {
+            if (targetIdentifier != null && 
!targetIdentifier.equals(identifier.get())) {
               return;
             }
 
             Map<String, StatisticValue<?>> statisticsByName =
-                aggregated.computeIfAbsent(identifier, k -> new 
LinkedHashMap<>());
+                aggregated.computeIfAbsent(identifier.get(), k -> new 
LinkedHashMap<>());
             populateStatistics(record, statisticsByName);
           }
         });
@@ -171,16 +172,16 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
         new StatisticsRecordVisitor() {
           @Override
           public void onTable(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseTableIdentifier(record.identifier());
-            if (targetIdentifier.equals(identifier)) {
+            Optional<NameIdentifier> identifier = 
parseTableIdentifier(record.identifier());
+            if (identifier.isPresent() && 
targetIdentifier.equals(identifier.get())) {
               populateStatistics(record, tableStatistics);
             }
           }
 
           @Override
           public void onPartition(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseTableIdentifier(record.identifier());
-            if (!targetIdentifier.equals(identifier)) {
+            Optional<NameIdentifier> identifier = 
parseTableIdentifier(record.identifier());
+            if (identifier.isEmpty() || 
!targetIdentifier.equals(identifier.get())) {
               return;
             }
 
@@ -210,19 +211,20 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
         new StatisticsRecordVisitor() {
           @Override
           public void onTable(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseTableIdentifier(record.identifier());
-            if (identifier == null) {
+            Optional<NameIdentifier> identifier = 
parseTableIdentifier(record.identifier());
+            if (identifier.isEmpty()) {
               return;
             }
             Map<String, StatisticValue<?>> tableStats =
-                tableStatisticsByIdentifier.computeIfAbsent(identifier, k -> 
new LinkedHashMap<>());
+                tableStatisticsByIdentifier.computeIfAbsent(
+                    identifier.get(), k -> new LinkedHashMap<>());
             populateStatistics(record, tableStats);
           }
 
           @Override
           public void onPartition(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseTableIdentifier(record.identifier());
-            if (identifier == null) {
+            Optional<NameIdentifier> identifier = 
parseTableIdentifier(record.identifier());
+            if (identifier.isEmpty()) {
               return;
             }
 
@@ -233,7 +235,7 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
 
             Map<PartitionPath, Map<String, StatisticValue<?>>> 
partitionStatsByPath =
                 partitionStatisticsByIdentifier.computeIfAbsent(
-                    identifier, k -> new LinkedHashMap<>());
+                    identifier.get(), k -> new LinkedHashMap<>());
             Map<String, StatisticValue<?>> partitionStatsByName =
                 partitionStatsByPath.computeIfAbsent(
                     partitionPathOpt.get(), k -> new LinkedHashMap<>());
@@ -265,16 +267,16 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
         new StatisticsRecordVisitor() {
           @Override
           public void onJob(StatisticsRecord record) {
-            NameIdentifier identifier = 
parseJobIdentifier(record.identifier());
-            if (identifier == null) {
+            Optional<NameIdentifier> identifier = 
parseJobIdentifier(record.identifier());
+            if (identifier.isEmpty()) {
               return;
             }
-            if (targetIdentifier != null && 
!targetIdentifier.equals(identifier)) {
+            if (targetIdentifier != null && 
!targetIdentifier.equals(identifier.get())) {
               return;
             }
 
             Map<String, StatisticValue<?>> statisticsByName =
-                aggregated.computeIfAbsent(identifier, k -> new 
LinkedHashMap<>());
+                aggregated.computeIfAbsent(identifier.get(), k -> new 
LinkedHashMap<>());
             populateStatistics(record, statisticsByName);
           }
         });
@@ -406,42 +408,24 @@ abstract class AbstractStatisticsImporter implements 
StatisticsImporter {
     }
   }
 
-  private NameIdentifier parseTableIdentifier(String identifierText) {
+  private Optional<NameIdentifier> parseTableIdentifier(String identifierText) 
{
     if (StringUtils.isBlank(identifierText)) {
-      return null;
+      return Optional.empty();
     }
-    try {
-      NameIdentifier parsed = NameIdentifier.parse(identifierText);
-      int levels = parsed.namespace().levels().length;
-      if (levels == 0) {
-        return parsed;
-      } else if (levels == 1) {
-        if (StringUtils.isNotBlank(defaultCatalogName)) {
-          return NameIdentifier.of(
-              defaultCatalogName, parsed.namespace().levels()[0], 
parsed.name());
-        }
-        return parsed;
-      } else if (levels == 2) {
-        return parsed;
-      } else {
-        return null;
-      }
-    } catch (Exception e) {
+    Optional<NameIdentifier> parsed =
+        IdentifierUtils.parseTableIdentifier(identifierText, 
defaultCatalogName);
+    if (parsed.isEmpty()) {
       LOG.warn("Skip line with invalid identifier: {}", identifierText);
-      return null;
     }
+    return parsed;
   }
 
-  private NameIdentifier parseJobIdentifier(String identifierText) {
-    if (StringUtils.isBlank(identifierText)) {
-      return null;
-    }
-    try {
-      return NameIdentifier.parse(identifierText);
-    } catch (Exception e) {
+  private Optional<NameIdentifier> parseJobIdentifier(String identifierText) {
+    Optional<NameIdentifier> parsed = 
IdentifierUtils.parseJobIdentifier(identifierText);
+    if (parsed.isEmpty() && StringUtils.isNotBlank(identifierText)) {
       LOG.warn("Skip line with invalid identifier: {}", identifierText);
-      return null;
     }
+    return parsed;
   }
 
   private static final class StatisticsRecord {
diff --git 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 2458ac7926..c059ab3483 100644
--- 
a/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -23,3 +23,7 @@ 
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter
 org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter
 
org.apache.gravitino.maintenance.optimizer.updater.statistics.GravitinoStatisticsUpdater
 
org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater
+org.apache.gravitino.maintenance.optimizer.monitor.metrics.GravitinoMetricsProvider
+org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider
+org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider
+org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
similarity index 67%
copy from 
maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
copy to 
maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
index 4cc9447c73..d151270911 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/main/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator
@@ -17,8 +17,4 @@
 # under the License.
 #
 
-org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
-org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
-org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest
+org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
index 4b4958ff53..dd50fc4962 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestIdentifierUtils.java
@@ -103,4 +103,67 @@ class TestIdentifierUtils {
         IllegalArgumentException.class,
         () -> IdentifierUtils.requireTableIdentifierNormalized(identifier));
   }
+
+  @Test
+  void testParseTableIdentifierWithDefaultCatalog() {
+    NameIdentifier result =
+        IdentifierUtils.parseTableIdentifier("db.table", 
"catalog").orElseThrow();
+
+    Assertions.assertEquals(NameIdentifier.of("catalog", "db", "table"), 
result);
+  }
+
+  @Test
+  void testParseTableIdentifierWithoutDefaultCatalogReturnsEmpty() {
+    Assertions.assertTrue(IdentifierUtils.parseTableIdentifier("db.table", 
null).isEmpty());
+  }
+
+  @Test
+  void testParseTableIdentifierWithThreeLevelIdentifier() {
+    NameIdentifier result =
+        IdentifierUtils.parseTableIdentifier("catalog.db.table", 
null).orElseThrow();
+
+    Assertions.assertEquals(NameIdentifier.of("catalog", "db", "table"), 
result);
+  }
+
+  @Test
+  void testParseTableIdentifierWithSingleLevelReturnsEmpty() {
+    Assertions.assertTrue(IdentifierUtils.parseTableIdentifier("table", 
"catalog").isEmpty());
+  }
+
+  @Test
+  void testParseTableIdentifierWithInvalidIdentifierReturnsEmpty() {
+    Assertions.assertTrue(
+        IdentifierUtils.parseTableIdentifier("catalog.db.schema.extra.table", 
"c").isEmpty());
+  }
+
+  @Test
+  void testParseJobIdentifierWithValidIdentifier() {
+    NameIdentifier result =
+        
IdentifierUtils.parseJobIdentifier("org.team.pipeline.job1").orElseThrow();
+    Assertions.assertEquals("org.team.pipeline.job1", result.toString());
+  }
+
+  @Test
+  void testParseJobIdentifierWithSingleLevelName() {
+    NameIdentifier result = 
IdentifierUtils.parseJobIdentifier("job").orElseThrow();
+    Assertions.assertEquals("job", result.toString());
+  }
+
+  @Test
+  void testParseJobIdentifierWithTwoLevelName() {
+    NameIdentifier result = 
IdentifierUtils.parseJobIdentifier("team.job").orElseThrow();
+    Assertions.assertEquals("team.job", result.toString());
+  }
+
+  @Test
+  void testParseJobIdentifierWithBlankOrInvalidIdentifierReturnsEmpty() {
+    Assertions.assertTrue(IdentifierUtils.parseJobIdentifier(" ").isEmpty());
+    
Assertions.assertTrue(IdentifierUtils.parseJobIdentifier("invalid..job").isEmpty());
+  }
+
+  @Test
+  void testParseJobIdentifierWithTooManyLevels() {
+    NameIdentifier result = 
IdentifierUtils.parseJobIdentifier("a.b.c.d.e").orElseThrow();
+    Assertions.assertEquals("a.b.c.d.e", result.toString());
+  }
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
index 9801b9f727..3ac0e1a84d 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/common/util/TestProviderUtils.java
@@ -19,12 +19,15 @@
 
 package org.apache.gravitino.maintenance.optimizer.common.util;
 
+import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.maintenance.optimizer.api.recommender.JobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StatisticsProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.StrategyProvider;
 import 
org.apache.gravitino.maintenance.optimizer.api.recommender.TableMetadataProvider;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.ConsoleMonitorCallback;
 import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.dummy.DummyTableJobRelationProvider;
 import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.job.GravitinoJobSubmitter;
 import 
org.apache.gravitino.maintenance.optimizer.recommender.job.NoopJobSubmitter;
@@ -78,8 +81,20 @@ public class TestProviderUtils {
         
ProviderUtils.createMetricsProviderInstance(MetricsProviderForTest.NAME)
             instanceof MetricsProviderForTest);
     Assertions.assertTrue(
-        ProviderUtils.createJobProviderInstance(JobProviderForTest.NAME)
-            instanceof JobProviderForTest);
+        
ProviderUtils.createTableJobRelationProviderInstance(TableJobRelationProviderForTest.NAME)
+            instanceof TableJobRelationProviderForTest);
+    DummyTableJobRelationProvider dummyTableJobRelationProvider =
+        (DummyTableJobRelationProvider)
+            ProviderUtils.createTableJobRelationProviderInstance(
+                DummyTableJobRelationProvider.NAME);
+    Assertions.assertNotNull(dummyTableJobRelationProvider);
+    Assertions.assertTrue(
+        dummyTableJobRelationProvider
+            .jobIdentifiers(NameIdentifier.parse("catalog.db.table"))
+            .isEmpty());
+    Assertions.assertTrue(
+        
ProviderUtils.createMonitorCallbackInstance(ConsoleMonitorCallback.NAME)
+            instanceof ConsoleMonitorCallback);
     Assertions.assertTrue(
         
ProviderUtils.createMonitorCallbackInstance(MonitorCallbackForTest.NAME)
             instanceof MonitorCallbackForTest);
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
index 5185edf7ce..9a3d934e1c 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/TestMonitor.java
@@ -30,8 +30,9 @@ import 
org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
 import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
 import 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.GravitinoMetricsEvaluator;
 import 
org.apache.gravitino.maintenance.optimizer.monitor.evaluator.MetricsEvaluatorForTest;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
 import 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -44,7 +45,9 @@ public class TestMonitor {
         new OptimizerConfig(
             ImmutableMap.<String, String>builder()
                 .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
-                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+                    TableJobRelationProviderForTest.NAME)
                 .put(
                     OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
                 .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
@@ -92,7 +95,7 @@ public class TestMonitor {
             .longValue());
 
     Assertions.assertEquals(MetricScope.Type.JOB, jobResult1.scope().type());
-    Assertions.assertEquals(JobProviderForTest.JOB1, 
jobResult1.scope().identifier());
+    Assertions.assertEquals(TableJobRelationProviderForTest.JOB1, 
jobResult1.scope().identifier());
     Assertions.assertTrue(jobResult1.evaluation());
     Assertions.assertEquals(99L, 
jobResult1.beforeMetrics().get("duration").get(0).timestamp());
     Assertions.assertEquals(102L, 
jobResult1.afterMetrics().get("duration").get(0).timestamp());
@@ -106,7 +109,7 @@ public class TestMonitor {
             .longValue());
 
     Assertions.assertEquals(MetricScope.Type.JOB, jobResult2.scope().type());
-    Assertions.assertEquals(JobProviderForTest.JOB2, 
jobResult2.scope().identifier());
+    Assertions.assertEquals(TableJobRelationProviderForTest.JOB2, 
jobResult2.scope().identifier());
     Assertions.assertFalse(jobResult2.evaluation());
     Assertions.assertEquals(98L, 
jobResult2.beforeMetrics().get("duration").get(0).timestamp());
     Assertions.assertEquals(104L, 
jobResult2.afterMetrics().get("duration").get(0).timestamp());
@@ -126,7 +129,9 @@ public class TestMonitor {
         new OptimizerConfig(
             ImmutableMap.<String, String>builder()
                 .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
-                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+                    TableJobRelationProviderForTest.NAME)
                 .put(
                     OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
                 .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
@@ -172,7 +177,9 @@ public class TestMonitor {
         new OptimizerConfig(
             ImmutableMap.<String, String>builder()
                 .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
-                .put(OptimizerConfig.JOB_PROVIDER_CONFIG.getKey(), 
JobProviderForTest.NAME)
+                .put(
+                    
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+                    TableJobRelationProviderForTest.NAME)
                 .put(
                     OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(), 
MetricsEvaluatorForTest.NAME)
                 .put(OptimizerConfig.MONITOR_CALLBACKS_CONFIG.getKey(), 
MonitorCallbackForTest.NAME)
@@ -189,4 +196,35 @@ public class TestMonitor {
       Assertions.assertTrue(exception.getMessage().contains("time range 
overflow"));
     }
   }
+
+  @Test
+  public void testEvaluateMetricsWithScopedGravitinoEvaluatorRules() throws 
Exception {
+    OptimizerConfig config =
+        new OptimizerConfig(
+            ImmutableMap.<String, String>builder()
+                .put(OptimizerConfig.METRICS_PROVIDER_CONFIG.getKey(), 
MetricsProviderForTest.NAME)
+                .put(
+                    
OptimizerConfig.TABLE_JOB_RELATION_PROVIDER_CONFIG.getKey(),
+                    TableJobRelationProviderForTest.NAME)
+                .put(
+                    OptimizerConfig.METRICS_EVALUATOR_CONFIG.getKey(),
+                    GravitinoMetricsEvaluator.NAME)
+                .put(
+                    GravitinoMetricsEvaluator.EVALUATION_RULES_CONFIG,
+                    "table:row_count:avg:le,job:duration:latest:le")
+                .build());
+
+    OptimizerEnv env = new OptimizerEnv(config);
+    NameIdentifier tableIdentifier = NameIdentifier.parse("test.db.table");
+
+    List<EvaluationResult> results;
+    try (Monitor monitor = new Monitor(env)) {
+      results = monitor.evaluateMetrics(tableIdentifier, 100L, 10L, 
Optional.empty());
+    }
+
+    Assertions.assertEquals(3, results.size(), "Expected one table result and 
two job results");
+    Assertions.assertFalse(results.get(0).evaluation(), "Table rule should 
fail for test metrics");
+    Assertions.assertFalse(results.get(1).evaluation(), "Job1 latest duration 
rule should fail");
+    Assertions.assertFalse(results.get(2).evaluation(), "Job2 latest duration 
rule should fail");
+  }
 }
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
index 9d9a994d04..fab16fcf1f 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/MetricsEvaluatorForTest.java
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsEvaluator;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
 
 public class MetricsEvaluatorForTest implements MetricsEvaluator {
 
@@ -46,7 +46,7 @@ public class MetricsEvaluatorForTest implements 
MetricsEvaluator {
     INVOCATIONS.incrementAndGet();
     if (FAIL_JOB2
         && scope.type() == MetricScope.Type.JOB
-        && JobProviderForTest.JOB2.equals(scope.identifier())) {
+        && TableJobRelationProviderForTest.JOB2.equals(scope.identifier())) {
       return false;
     }
     return true;
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
new file mode 100644
index 0000000000..5120d42e24
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/evaluator/TestGravitinoMetricsEvaluator.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.evaluator;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
+import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricScope;
+import org.apache.gravitino.maintenance.optimizer.common.MetricSampleImpl;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.StatisticEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestGravitinoMetricsEvaluator {
+
+  @Test
+  public void testEvaluateMetricsWithMaxOperation() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:max:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(
+                    metric(100L, "row_count", 10L),
+                    metric(101L, "row_count", 50L),
+                    metric(102L, "row_count", 20L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 30L))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithMinOperation() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:min:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 30L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 12L))));
+
+    Assertions.assertFalse(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithAvgOperation() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:avg:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 20L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithLatestOperation() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:latest:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 100L), metric(105L, 
"row_count", 10L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 20L))));
+
+    Assertions.assertFalse(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsForPartitionUsesTableRules() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:latest:le");
+    PartitionPath partitionPath =
+        PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-14")));
+    MetricScope scope =
+        MetricScope.forPartition(NameIdentifier.parse("catalog.db.table"), 
partitionPath);
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 100L), metric(105L, 
"row_count", 10L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 20L))));
+
+    Assertions.assertFalse(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsDistinguishTableAndJobRules() {
+    GravitinoMetricsEvaluator evaluator =
+        createEvaluator("table:row_count:avg:le,job:duration:latest:le");
+
+    boolean tableResult =
+        evaluator.evaluateMetrics(
+            MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 20L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+    Assertions.assertTrue(tableResult);
+
+    boolean jobResult =
+        evaluator.evaluateMetrics(
+            MetricScope.forJob(NameIdentifier.parse("job1")),
+            Map.of("duration", List.of(metric(100L, "duration", 3L), 
metric(105L, "duration", 5L))),
+            Map.of("duration", List.of(metric(110L, "duration", 10L))));
+    Assertions.assertFalse(jobResult);
+  }
+
+  @Test
+  public void testEvaluateMetricsMissingSelectedMetricSkipsRule() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:avg:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of("other_metric", List.of(metric(100L, "other_metric", 10L))),
+            Map.of("other_metric", List.of(metric(110L, "other_metric", 5L))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsNonNumericValueSkipsRule() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:avg:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of("row_count", List.of(metricString(100L, "row_count", 
"invalid"))),
+            Map.of("row_count", List.of(metricString(110L, "row_count", 
"invalid2"))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithoutRulesForScopeReturnsTrue() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:avg:le");
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            MetricScope.forJob(NameIdentifier.parse("job1")),
+            Map.of("duration", List.of(metric(100L, "duration", 10L))),
+            Map.of("duration", List.of(metric(110L, "duration", 20L))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithMixedCaseRuleMetricName() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:Row_Count:avg:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 20L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testEvaluateMetricsWithEmptyBeforeAndAfterMetricsReturnsTrue() {
+    GravitinoMetricsEvaluator evaluator = 
createEvaluator("table:row_count:avg:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result = evaluator.evaluateMetrics(scope, Map.of(), Map.of());
+    Assertions.assertTrue(result);
+
+    boolean emptyListResult =
+        evaluator.evaluateMetrics(
+            scope, Map.of("row_count", List.of()), Map.of("row_count", 
List.of()));
+    Assertions.assertTrue(emptyListResult);
+  }
+
+  @Test
+  public void testEvaluateMetricsReturnsFalseWhenAnyRuleFails() {
+    GravitinoMetricsEvaluator evaluator =
+        createEvaluator("table:row_count:avg:le,table:file_count:max:le");
+    MetricScope scope = 
MetricScope.forTable(NameIdentifier.parse("catalog.db.table"));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            scope,
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 20L)),
+                "file_count",
+                List.of(metric(100L, "file_count", 5L), metric(101L, 
"file_count", 10L))),
+            Map.of(
+                "row_count",
+                List.of(metric(110L, "row_count", 15L)),
+                "file_count",
+                List.of(metric(110L, "file_count", 11L))));
+
+    Assertions.assertFalse(result);
+  }
+
+  @Test
+  public void testInitializeAcceptsWhitespaceAndCaseInsensitiveRuleTokens() {
+    GravitinoMetricsEvaluator evaluator =
+        createEvaluator(" TABLE : row_count : AVG : LE , JOB : duration : 
latest : le , ");
+
+    boolean tableResult =
+        evaluator.evaluateMetrics(
+            MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+            Map.of(
+                "row_count",
+                List.of(metric(100L, "row_count", 10L), metric(101L, 
"row_count", 20L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 15L))));
+    Assertions.assertTrue(tableResult);
+
+    boolean jobResult =
+        evaluator.evaluateMetrics(
+            MetricScope.forJob(NameIdentifier.parse("job1")),
+            Map.of("duration", List.of(metric(100L, "duration", 3L), 
metric(105L, "duration", 5L))),
+            Map.of("duration", List.of(metric(110L, "duration", 10L))));
+    Assertions.assertFalse(jobResult);
+  }
+
+  @Test
+  public void testParseEvaluationRulesReturnsEmptyForBlankInput() {
+    Assertions.assertTrue(GravitinoMetricsEvaluator.parseEvaluationRules("  
").isEmpty());
+  }
+
+  @Test
+  public void testParseEvaluationRulesParsesExpectedScopesAndRules() {
+    Map<?, ?> parsed =
+        GravitinoMetricsEvaluator.parseEvaluationRules(
+            "table:row_count:avg:le,job:duration:max:gt");
+
+    Assertions.assertEquals(2, parsed.size());
+    Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+    Map<?, ?> jobRules = findScopeRules(parsed, "JOB");
+    Assertions.assertEquals(1, tableRules.size());
+    Assertions.assertEquals(1, jobRules.size());
+    Assertions.assertEquals("AVG:LE", tableRules.get("row_count").toString());
+    Assertions.assertEquals("MAX:GT", jobRules.get("duration").toString());
+  }
+
+  @Test
+  public void testParseEvaluationRulesCreatesImmutableMaps() {
+    Map<?, ?> parsed = 
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:le");
+
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> parsedMutableView = (Map<Object, Object>) parsed;
+    Assertions.assertThrows(
+        UnsupportedOperationException.class, () -> 
parsedMutableView.put("TABLE", Map.of()));
+    Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+    @SuppressWarnings("unchecked")
+    Map<Object, Object> tableRulesMutableView = (Map<Object, Object>) 
tableRules;
+    Assertions.assertThrows(
+        UnsupportedOperationException.class, () -> 
tableRulesMutableView.put("x", "y"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsInvalidAggregation() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:sum:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsPartitionScopeInRule() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("partition:row_count:avg:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsRuleWithoutScope() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("row_count:avg:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsDuplicateRuleInSameScope() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            GravitinoMetricsEvaluator.parseEvaluationRules(
+                "table:row_count:avg:le,table:row_count:max:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsMalformedRuleToken() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:le:extra"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsLegacyScopeMetricRuleFormat() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("table.row_count:avg:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsRuleWithBlankMetricName() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> GravitinoMetricsEvaluator.parseEvaluationRules("table::avg:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsRuleWithBlankScope() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules(":row_count:avg:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsRuleWithBlankAggregation() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count::le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesRejectsRuleWithBlankComparison() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
GravitinoMetricsEvaluator.parseEvaluationRules("table:row_count:avg:"));
+  }
+
+  @Test
+  public void 
testParseEvaluationRulesRejectsDuplicateMetricRuleAfterNormalization() {
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            GravitinoMetricsEvaluator.parseEvaluationRules(
+                "table:Row_Count:avg:le,table:row_count:max:le"));
+  }
+
+  @Test
+  public void testParseEvaluationRulesAllowsBlankRuleEntriesBetweenCommas() {
+    Map<?, ?> parsed =
+        GravitinoMetricsEvaluator.parseEvaluationRules(
+            "table:row_count:avg:le,,job:duration:latest:le,");
+
+    Assertions.assertEquals(2, parsed.size());
+    Map<?, ?> tableRules = findScopeRules(parsed, "TABLE");
+    Map<?, ?> jobRules = findScopeRules(parsed, "JOB");
+    Assertions.assertEquals("AVG:LE", tableRules.get("row_count").toString());
+    Assertions.assertEquals("LATEST:LE", jobRules.get("duration").toString());
+  }
+
+  @Test
+  public void testInitializeAllowsBlankRules() {
+    GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+    evaluator.initialize(optimizerEnv("  "));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            MetricScope.forTable(NameIdentifier.parse("catalog.db.table")),
+            Map.of("row_count", List.of(metric(100L, "row_count", 10L))),
+            Map.of("row_count", List.of(metric(110L, "row_count", 999L))));
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testInitializeAllowsMissingRules() {
+    GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+    evaluator.initialize(new OptimizerEnv(new OptimizerConfig(Map.of())));
+
+    boolean result =
+        evaluator.evaluateMetrics(
+            MetricScope.forJob(NameIdentifier.parse("job1")),
+            Map.of("duration", List.of(metric(100L, "duration", 1L))),
+            Map.of("duration", List.of(metric(110L, "duration", 999L))));
+    Assertions.assertTrue(result);
+  }
+
+  @Test
+  public void testInitializeRejectsInvalidComparison() {
+    GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> 
evaluator.initialize(optimizerEnv("table:row_count:avg:decrease")));
+  }
+
+  private static GravitinoMetricsEvaluator createEvaluator(String rules) {
+    GravitinoMetricsEvaluator evaluator = new GravitinoMetricsEvaluator();
+    evaluator.initialize(optimizerEnv(rules));
+    return evaluator;
+  }
+
+  private static OptimizerEnv optimizerEnv(String rules) {
+    return new OptimizerEnv(
+        new 
OptimizerConfig(Map.of(GravitinoMetricsEvaluator.EVALUATION_RULES_CONFIG, 
rules)));
+  }
+
+  private static MetricSample metric(long timestamp, String name, long value) {
+    return new MetricSampleImpl(timestamp, entry(name, 
StatisticValues.longValue(value)));
+  }
+
+  private static MetricSample metricString(long timestamp, String name, String 
value) {
+    return new MetricSampleImpl(timestamp, entry(name, 
StatisticValues.stringValue(value)));
+  }
+
+  private static StatisticEntry<?> entry(
+      String name, org.apache.gravitino.stats.StatisticValue<?> value) {
+    return new StatisticEntryImpl(name, value);
+  }
+
+  private static Map<?, ?> findScopeRules(Map<?, ?> parsedRules, String 
scopeName) {
+    return parsedRules.entrySet().stream()
+        .filter(entry -> scopeName.equals(entry.getKey().toString()))
+        .map(Map.Entry::getValue)
+        .map(Map.class::cast)
+        .findFirst()
+        .orElseThrow(() -> new AssertionError("Missing expected scope " + 
scopeName));
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
similarity index 85%
rename from 
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
rename to 
maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
index 5ab2cebd61..888d06efa3 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/JobProviderForTest.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TableJobRelationProviderForTest.java
@@ -21,12 +21,12 @@ package 
org.apache.gravitino.maintenance.optimizer.monitor.job;
 
 import java.util.List;
 import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.maintenance.optimizer.api.monitor.JobProvider;
+import 
org.apache.gravitino.maintenance.optimizer.api.monitor.TableJobRelationProvider;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
 
-public class JobProviderForTest implements JobProvider {
+public class TableJobRelationProviderForTest implements 
TableJobRelationProvider {
 
-  public static final String NAME = "job-provider-for-test";
+  public static final String NAME = "table-job-relation-provider-for-test";
   public static final NameIdentifier JOB1 = 
NameIdentifier.parse("test.db.job1");
   public static final NameIdentifier JOB2 = 
NameIdentifier.parse("test.db.job2");
 
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
new file mode 100644
index 0000000000..2d0b96261f
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/job/TestLocalTableJobRelationProvider.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.job;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.local.LocalTableJobRelationProvider;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+class TestLocalTableJobRelationProvider {
+
+  @TempDir Path tempDir;
+
+  @Test
+  void testGetJobNamesMergesAndNormalizesIdentifiers() throws IOException {
+    Path jobFile = tempDir.resolve("jobs.jsonl");
+    Files.write(
+        jobFile,
+        List.of(
+            
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"job1\",\"job2\"]}",
+            
"{\"identifier\":\"schema.table\",\"job-identifiers\":[\"job2\",\"job3\"]}",
+            
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"job4\",\"invalid..job\"]}",
+            
"{\"identifier\":\"catalog.schema.table\",\"job-identifiers\":[\"\",12]}",
+            "{\"identifier\":\"other.table\",\"job-identifiers\":[\"jobX\"]}",
+            "malformed json"));
+
+    LocalTableJobRelationProvider provider = new 
LocalTableJobRelationProvider();
+    OptimizerEnv optimizerEnv = new OptimizerEnv(createConfig(jobFile));
+    provider.initialize(optimizerEnv);
+
+    List<NameIdentifier> jobs =
+        provider.jobIdentifiers(NameIdentifier.parse("catalog.schema.table"));
+
+    Assertions.assertEquals(
+        List.of(
+            NameIdentifier.parse("job1"),
+            NameIdentifier.parse("job2"),
+            NameIdentifier.parse("job3"),
+            NameIdentifier.parse("job4")),
+        jobs);
+  }
+
+  @Test
+  void testInitializeRequiresFilePath() {
+    LocalTableJobRelationProvider provider = new 
LocalTableJobRelationProvider();
+    OptimizerEnv optimizerEnv = new OptimizerEnv(new OptimizerConfig());
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class, () -> 
provider.initialize(optimizerEnv));
+  }
+
+  @Test
+  void testInitializeRejectsMissingFilePath() {
+    LocalTableJobRelationProvider provider = new 
LocalTableJobRelationProvider();
+    Path missingFile = tempDir.resolve("missing-jobs.jsonl");
+    OptimizerEnv optimizerEnv = new OptimizerEnv(createConfig(missingFile));
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class, () -> 
provider.initialize(optimizerEnv));
+  }
+
+  @Test
+  void testJobIdentifiersRequiresInitialize() {
+    LocalTableJobRelationProvider provider = new 
LocalTableJobRelationProvider();
+    Assertions.assertThrows(
+        IllegalStateException.class,
+        () -> 
provider.jobIdentifiers(NameIdentifier.parse("catalog.schema.table")));
+  }
+
+  private OptimizerConfig createConfig(Path jobFile) {
+    Map<String, String> configs = new HashMap<>();
+    configs.put(LocalTableJobRelationProvider.JOB_FILE_PATH_CONFIG, 
jobFile.toString());
+    configs.put(OptimizerConfig.GRAVITINO_DEFAULT_CATALOG_CONFIG.getKey(), 
"catalog");
+    return new OptimizerConfig(configs);
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
index 3f371c24c1..2f862dcd50 100644
--- 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/MetricsProviderForTest.java
@@ -28,7 +28,7 @@ import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
 import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry;
 import org.apache.gravitino.maintenance.optimizer.api.monitor.MetricsProvider;
 import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
-import 
org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest;
+import 
org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest;
 import org.apache.gravitino.stats.StatisticValue;
 import org.apache.gravitino.stats.StatisticValues;
 
@@ -49,14 +49,14 @@ public class MetricsProviderForTest implements 
MetricsProvider {
   @Override
   public Map<String, List<MetricSample>> jobMetrics(
       NameIdentifier jobIdentifier, long startTime, long endTime) {
-    if (JobProviderForTest.JOB1.equals(jobIdentifier)) {
+    if (TableJobRelationProviderForTest.JOB1.equals(jobIdentifier)) {
       return Map.of(
           "duration",
           List.of(
               metric(99, "duration", StatisticValues.longValue(10L)),
               metric(102, "duration", StatisticValues.longValue(20L))));
     }
-    if (JobProviderForTest.JOB2.equals(jobIdentifier)) {
+    if (TableJobRelationProviderForTest.JOB2.equals(jobIdentifier)) {
       return Map.of(
           "duration",
           List.of(
diff --git 
a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
new file mode 100644
index 0000000000..e19091e8ce
--- /dev/null
+++ 
b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/monitor/metrics/TestGravitinoMetricsProvider.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.maintenance.optimizer.monitor.metrics;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.maintenance.optimizer.api.common.MetricSample;
+import 
org.apache.gravitino.maintenance.optimizer.api.common.PartitionMetricSample;
+import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath;
+import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv;
+import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl;
+import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig;
+import 
org.apache.gravitino.maintenance.optimizer.common.util.StatisticValueUtils;
+import 
org.apache.gravitino.maintenance.optimizer.recommender.util.PartitionUtils;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.JobMetricWriteRequest;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricRecordImpl;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.MetricsRepository;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.TableMetricWriteRequest;
+import 
org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository;
+import org.apache.gravitino.stats.StatisticValues;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class TestGravitinoMetricsProvider {
+
+  @TempDir Path tempDir;
+
+  @Test
+  public void testReadTableJobAndPartitionMetrics() throws Exception {
+    Path metricsPath = tempDir.resolve("metrics-test");
+    String jdbcUrl = "jdbc:h2:file:" + metricsPath + 
";DB_CLOSE_DELAY=-1;MODE=MYSQL";
+    Map<String, String> configs =
+        Map.of(
+            OptimizerConfig.OPTIMIZER_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_URL,
+            jdbcUrl,
+            OptimizerConfig.OPTIMIZER_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_USER,
+            "sa",
+            OptimizerConfig.OPTIMIZER_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_METRICS_PREFIX
+                + GenericJdbcMetricsRepository.JDBC_PASSWORD,
+            "");
+    OptimizerEnv optimizerEnv = new OptimizerEnv(new OptimizerConfig(configs));
+
+    NameIdentifier table = NameIdentifier.parse("catalog.db.table1");
+    NameIdentifier job = NameIdentifier.parse("job1");
+    PartitionPath partitionPath =
+        PartitionPath.of(List.of(new PartitionEntryImpl("dt", "2026-02-14")));
+
+    try (MetricsRepository metricsRepository = new 
GenericJdbcMetricsRepository()) {
+      metricsRepository.initialize(configs);
+      metricsRepository.storeTableMetrics(
+          List.of(
+              new TableMetricWriteRequest(
+                  table,
+                  "row_count",
+                  Optional.empty(),
+                  new MetricRecordImpl(
+                      100L, 
StatisticValueUtils.toString(StatisticValues.longValue(10L))))));
+      metricsRepository.storeTableMetrics(
+          List.of(
+              new TableMetricWriteRequest(
+                  table,
+                  "row_count",
+                  
Optional.of(PartitionUtils.encodePartitionPath(partitionPath)),
+                  new MetricRecordImpl(
+                      101L, 
StatisticValueUtils.toString(StatisticValues.longValue(11L))))));
+      metricsRepository.storeJobMetrics(
+          List.of(
+              new JobMetricWriteRequest(
+                  job,
+                  "duration",
+                  new MetricRecordImpl(
+                      102L, 
StatisticValueUtils.toString(StatisticValues.longValue(99L))))));
+    }
+
+    GravitinoMetricsProvider provider = new GravitinoMetricsProvider();
+    provider.initialize(optimizerEnv);
+    try {
+      Map<String, List<MetricSample>> tableMetrics = 
provider.tableMetrics(table, 0L, 200L);
+      Assertions.assertEquals(1, tableMetrics.get("row_count").size());
+      Assertions.assertEquals(
+          10L, 
tableMetrics.get("row_count").get(0).statistic().value().value());
+
+      Map<String, List<MetricSample>> partitionMetrics =
+          provider.partitionMetrics(table, partitionPath, 0L, 200L);
+      Assertions.assertEquals(1, partitionMetrics.get("row_count").size());
+      Assertions.assertTrue(
+          partitionMetrics.get("row_count").get(0) instanceof 
PartitionMetricSample);
+
+      Map<String, List<MetricSample>> jobMetrics = provider.jobMetrics(job, 
0L, 200L);
+      Assertions.assertEquals(1, jobMetrics.get("duration").size());
+      Assertions.assertEquals(99L, 
jobMetrics.get("duration").get(0).statistic().value().value());
+    } finally {
+      provider.close();
+    }
+  }
+}
diff --git 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
index 4cc9447c73..5048a10cb0 100644
--- 
a/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
+++ 
b/maintenance/optimizer/src/test/resources/META-INF/services/org.apache.gravitino.maintenance.optimizer.api.common.Provider
@@ -19,6 +19,6 @@
 
 org.apache.gravitino.maintenance.optimizer.updater.StatisticsUpdaterForTest
 org.apache.gravitino.maintenance.optimizer.updater.MetricsUpdaterForTest
-org.apache.gravitino.maintenance.optimizer.monitor.job.JobProviderForTest
+org.apache.gravitino.maintenance.optimizer.monitor.job.TableJobRelationProviderForTest
 
org.apache.gravitino.maintenance.optimizer.monitor.metrics.MetricsProviderForTest
 
org.apache.gravitino.maintenance.optimizer.monitor.callback.MonitorCallbackForTest

Reply via email to