This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new ca7faaede2 [#7270] feat(core): Support Statistic manager (#7734)
ca7faaede2 is described below

commit ca7faaede2670d5ccf1c8ff6293125f4b953cf64
Author: roryqi <[email protected]>
AuthorDate: Fri Aug 15 19:04:39 2025 +0800

    [#7270] feat(core): Support Statistic manager (#7734)
    
    ### What changes were proposed in this pull request?
    
    Support Statistic manager
    
    ### Why are the changes needed?
    
    Fix: #7270
    
    ### Does this PR introduce _any_ user-facing change?
    
    I will add the document later.
    
    ### How was this patch tested?
    
    Add UT.
---
 .../stats/SupportsPartitionStatistics.java         |   4 +-
 .../apache/gravitino/stats/SupportsStatistics.java |   3 +-
 .../src/main/java/org/apache/gravitino/Entity.java |   2 +-
 .../java/org/apache/gravitino/EntityStore.java     |  26 ++
 .../java/org/apache/gravitino/GravitinoEnv.java    |   7 +
 .../gravitino/SupportsRelationOperations.java      |   2 +-
 .../org/apache/gravitino/meta/StatisticEntity.java |  95 ++++---
 .../gravitino/meta/TableStatisticEntity.java       |  43 +++
 .../apache/gravitino/stats/StatisticManager.java   | 213 +++++++++++++++
 .../gravitino/storage/relational/JDBCBackend.java  |  84 +++++-
 .../storage/relational/RelationalBackend.java      |  24 ++
 .../storage/relational/RelationalEntityStore.java  |  14 +
 .../storage/relational/po/StatisticPO.java         |   4 +-
 .../relational/service/StatisticMetaService.java   |   7 +-
 .../gravitino/stats/TestStatisticManager.java      | 287 +++++++++++++++++++++
 .../storage/memory/TestMemoryEntityStore.java      |  14 +
 .../service/TestStatisticMetaService.java          |  31 +--
 .../storage/relational/utils/TestPOConverters.java |   5 +-
 18 files changed, 799 insertions(+), 66 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
index 12eac9e4cc..8d25aba2c2 100644
--- 
a/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
+++ 
b/api/src/main/java/org/apache/gravitino/stats/SupportsPartitionStatistics.java
@@ -43,11 +43,9 @@ public interface SupportsPartitionStatistics {
    * @param statisticsToUpdate a list of PartitionUpdateStatistics, where each
    *     PartitionStatisticsUpdate contains the partition name and a map of 
statistic names to their
    *     values to be updated.
-   * @return a list of updated PartitionStatistics, where each 
PartitionStatistics contains the
-   *     partition name and a list of statistics applicable to that partition.
    * @throws UnmodifiableStatisticException if any of the statistics to be 
updated are unmodifiable
    */
-  List<PartitionStatistics> updateStatistics(List<PartitionStatisticsUpdate> 
statisticsToUpdate)
+  void updateStatistics(List<PartitionStatisticsUpdate> statisticsToUpdate)
       throws UnmodifiableStatisticException;
 
   /**
diff --git 
a/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java 
b/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
index dd8fa8a553..5923b4a089 100644
--- a/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
+++ b/api/src/main/java/org/apache/gravitino/stats/SupportsStatistics.java
@@ -45,9 +45,8 @@ public interface SupportsStatistics {
    * illegal, it will throw an IllegalStatisticNameException.
    *
    * @param statistics a map of statistic names to their values
-   * @return a list of updated statistics
    */
-  List<Statistic> updateStatistics(Map<String, StatisticValue<?>> statistics)
+  void updateStatistics(Map<String, StatisticValue<?>> statistics)
       throws UnmodifiableStatisticException, IllegalStatisticNameException;
 
   /**
diff --git a/core/src/main/java/org/apache/gravitino/Entity.java 
b/core/src/main/java/org/apache/gravitino/Entity.java
index d08ddd0011..fc575b72b1 100644
--- a/core/src/main/java/org/apache/gravitino/Entity.java
+++ b/core/src/main/java/org/apache/gravitino/Entity.java
@@ -77,7 +77,7 @@ public interface Entity extends Serializable {
     MODEL,
     MODEL_VERSION,
     POLICY,
-    STATISTIC,
+    TABLE_STATISTIC,
     JOB_TEMPLATE,
     JOB,
     AUDIT;
diff --git a/core/src/main/java/org/apache/gravitino/EntityStore.java 
b/core/src/main/java/org/apache/gravitino/EntityStore.java
index 2993cefe98..79f5e12e26 100644
--- a/core/src/main/java/org/apache/gravitino/EntityStore.java
+++ b/core/src/main/java/org/apache/gravitino/EntityStore.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.tag.SupportsTagOperations;
@@ -190,6 +191,31 @@ public interface EntityStore extends Closeable {
    */
   boolean delete(NameIdentifier ident, EntityType entityType, boolean cascade) 
throws IOException;
 
+  /**
+   * Batch delete entities from the underlying storage by the specified list 
of {@link
+   * org.apache.gravitino.NameIdentifier} and {@link EntityType}.
+   *
+   * @param entitiesToDelete the list of pairs of name identifiers and entity 
types to be deleted
+   * @param cascade if true, cascade delete the entities, otherwise just 
delete the entities
+   * @return the number of entities deleted
+   * @throws IOException if the batch delete operation fails
+   */
+  int batchDelete(List<Pair<NameIdentifier, EntityType>> entitiesToDelete, 
boolean cascade)
+      throws IOException;
+
+  /**
+   * Batch put entities into the underlying storage.
+   *
+   * @param entities the list of entities to be stored
+   * @param overwritten if true, overwrite the existing entities, otherwise 
throw an
+   * @param <E> the type of the entities
+   * @throws IOException if the batch put operation fails
+   * @throws EntityAlreadyExistsException if the entity already exists and the 
overwritten flag is
+   *     false
+   */
+  <E extends Entity & HasIdentifier> void batchPut(List<E> entities, boolean 
overwritten)
+      throws IOException, EntityAlreadyExistsException;
+
   /**
    * Execute the specified {@link Executable} in a transaction.
    *
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index cdc90ee63d..591542a36d 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -80,6 +80,7 @@ import org.apache.gravitino.metrics.MetricsSystem;
 import org.apache.gravitino.metrics.source.JVMMetricsSource;
 import org.apache.gravitino.policy.PolicyDispatcher;
 import org.apache.gravitino.policy.PolicyManager;
+import org.apache.gravitino.stats.StatisticManager;
 import org.apache.gravitino.storage.IdGenerator;
 import org.apache.gravitino.storage.RandomIdGenerator;
 import org.apache.gravitino.tag.TagDispatcher;
@@ -146,6 +147,7 @@ public class GravitinoEnv {
   private OwnerDispatcher ownerDispatcher;
   private FutureGrantManager futureGrantManager;
   private GravitinoAuthorizer gravitinoAuthorizer;
+  private StatisticManager statisticManager;
 
   protected GravitinoEnv() {}
 
@@ -413,6 +415,10 @@ public class GravitinoEnv {
     return jobOperationDispatcher;
   }
 
+  public StatisticManager statisticManager() {
+    return statisticManager;
+  }
+
   public void start() {
     metricsSystem.start();
     eventListenerManager.start();
@@ -557,6 +563,7 @@ public class GravitinoEnv {
     ModelNormalizeDispatcher modelNormalizeDispatcher =
         new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
     this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
+    this.statisticManager = new StatisticManager(entityStore, idGenerator);
 
     // Create and initialize access control related modules
     boolean enableAuthorization = config.get(Configs.ENABLE_AUTHORIZATION);
diff --git 
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java 
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index f64b60123d..5add56c640 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -39,7 +39,7 @@ public interface SupportsRelationOperations {
     /** Role and group relationship */
     ROLE_GROUP_REL,
     /** Policy and metadata object relationship */
-    POLICY_METADATA_OBJECT_REL,
+    POLICY_METADATA_OBJECT_REL
   }
 
   /**
diff --git a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
index 284b347a73..546d2401f7 100644
--- a/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
+++ b/core/src/main/java/org/apache/gravitino/meta/StatisticEntity.java
@@ -25,10 +25,11 @@ import org.apache.gravitino.Auditable;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Field;
 import org.apache.gravitino.HasIdentifier;
+import org.apache.gravitino.MetadataObject;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.stats.StatisticValue;
 
-public class StatisticEntity implements Entity, HasIdentifier, Auditable {
+public abstract class StatisticEntity implements Entity, HasIdentifier, 
Auditable {
   public static final Field ID =
       Field.required("id", Long.class, "The unique identifier of the statistic 
entity.");
   public static final Field NAME =
@@ -38,11 +39,11 @@ public class StatisticEntity implements Entity, 
HasIdentifier, Auditable {
   public static final Field AUDIT_INFO =
       Field.required("audit_info", Audit.class, "The audit details of the 
statistic entity.");
 
-  private Long id;
-  private String name;
-  private StatisticValue<?> value;
-  private AuditInfo auditInfo;
-  private Namespace namespace;
+  protected Long id;
+  protected String name;
+  protected StatisticValue<?> value;
+  protected AuditInfo auditInfo;
+  protected Namespace namespace;
 
   @Override
   public Audit auditInfo() {
@@ -59,11 +60,6 @@ public class StatisticEntity implements Entity, 
HasIdentifier, Auditable {
     return fields;
   }
 
-  @Override
-  public EntityType type() {
-    return EntityType.STATISTIC;
-  }
-
   @Override
   public String name() {
     return name;
@@ -83,45 +79,74 @@ public class StatisticEntity implements Entity, 
HasIdentifier, Auditable {
     return value;
   }
 
-  public static Builder builder() {
-    return new Builder();
+  public static EntityType getStatisticType(MetadataObject.Type type) {
+    switch (type) {
+      case TABLE:
+        return EntityType.TABLE_STATISTIC;
+      default:
+        throw new IllegalArgumentException(
+            "Unsupported metadata object type for statistics: " + type);
+    }
   }
 
-  public static class Builder {
-    private final StatisticEntity statisticEntity;
+  public static <S extends StatisticEntityBuilder<S, E>, E extends 
StatisticEntity> S builder(
+      Entity.EntityType type) {
+    switch (type) {
+      case TABLE_STATISTIC:
+        return (S) TableStatisticEntity.builder();
+      default:
+        throw new IllegalArgumentException("Unsupported statistic entity type: 
" + type);
+    }
+  }
 
-    private Builder() {
-      statisticEntity = new StatisticEntity();
+  interface Builder<SELF extends Builder<SELF, T>, T extends StatisticEntity> {
+    T build();
+  }
+
+  public abstract static class StatisticEntityBuilder<
+          SELF extends Builder<SELF, T>, T extends StatisticEntity>
+      implements Builder<SELF, T> {
+
+    protected Long id;
+    protected String name;
+    protected StatisticValue<?> value;
+    protected AuditInfo auditInfo;
+    protected Namespace namespace;
+
+    public SELF withId(Long id) {
+      this.id = id;
+      return self();
     }
 
-    public Builder withId(Long id) {
-      statisticEntity.id = id;
-      return this;
+    public SELF withName(String name) {
+      this.name = name;
+      return self();
     }
 
-    public Builder withName(String name) {
-      statisticEntity.name = name;
-      return this;
+    public SELF withValue(StatisticValue<?> value) {
+      this.value = value;
+      return self();
     }
 
-    public Builder withValue(StatisticValue<?> value) {
-      statisticEntity.value = value;
-      return this;
+    public SELF withAuditInfo(AuditInfo auditInfo) {
+      this.auditInfo = auditInfo;
+      return self();
     }
 
-    public Builder withAuditInfo(AuditInfo auditInfo) {
-      statisticEntity.auditInfo = auditInfo;
-      return this;
+    public SELF withNamespace(Namespace namespace) {
+      this.namespace = namespace;
+      return self();
     }
 
-    public Builder withNamespace(Namespace namespace) {
-      statisticEntity.namespace = namespace;
-      return this;
+    private SELF self() {
+      return (SELF) this;
     }
 
-    public StatisticEntity build() {
-      statisticEntity.validate();
-      return statisticEntity;
+    protected abstract T internalBuild();
+
+    public T build() {
+      T t = internalBuild();
+      return t;
     }
   }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java 
b/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java
new file mode 100644
index 0000000000..04c25403f8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/meta/TableStatisticEntity.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.meta;
+
+public class TableStatisticEntity extends StatisticEntity {
+  @Override
+  public EntityType type() {
+    return EntityType.TABLE_STATISTIC;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder extends StatisticEntityBuilder<Builder, 
TableStatisticEntity> {
+    @Override
+    protected TableStatisticEntity internalBuild() {
+      TableStatisticEntity entity = new TableStatisticEntity();
+      entity.id = id;
+      entity.name = name;
+      entity.value = value;
+      entity.auditInfo = auditInfo;
+      entity.namespace = namespace;
+      return entity;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java 
b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
new file mode 100644
index 0000000000..c1fc9bc892
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/stats/StatisticManager.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.stats;
+
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.exceptions.UnmodifiableStatisticException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.StatisticEntity;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.utils.Executable;
+import org.apache.gravitino.utils.MetadataObjectUtil;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatisticManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StatisticManager.class);
+
+  private final EntityStore store;
+
+  private final IdGenerator idGenerator;
+
+  public StatisticManager(EntityStore store, IdGenerator idGenerator) {
+    this.store = store;
+    this.idGenerator = idGenerator;
+  }
+
+  public List<Statistic> listStatistics(String metalake, MetadataObject 
metadataObject) {
+    try {
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+      Entity.EntityType type = 
StatisticEntity.getStatisticType(metadataObject.type());
+      return TreeLockUtils.doWithTreeLock(
+          identifier,
+          LockType.READ,
+          () ->
+              store.list(Namespace.fromString(identifier.toString()), 
StatisticEntity.class, type)
+                  .stream()
+                  .map(
+                      entity -> {
+                        String name = entity.name();
+                        StatisticValue<?> value = entity.value();
+                        return new CustomStatistic(name, value);
+                      })
+                  .collect(Collectors.toList()));
+    } catch (NoSuchEntityException nse) {
+      LOG.warn(
+          "Failed to list statistics for metadata object {} in the metalake 
{}: {}",
+          metadataObject.fullName(),
+          metalake,
+          nse.getMessage());
+      throw new NoSuchMetadataObjectException(
+          "The metadata object %s in the metalake %s isn't found",
+          metadataObject.fullName(), metalake);
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to list statistics for metadata object {} in the metalake {} 
: {}",
+          metadataObject.fullName(),
+          metalake,
+          ioe.getMessage());
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public void updateStatistics(
+      String metalake, MetadataObject metadataObject, Map<String, 
StatisticValue<?>> statistics) {
+    try {
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+      List<StatisticEntity> statisticEntities = Lists.newArrayList();
+      for (Map.Entry<String, StatisticValue<?>> entry : statistics.entrySet()) 
{
+        String name = entry.getKey();
+        StatisticValue<?> value = entry.getValue();
+
+        StatisticEntity statistic =
+            
StatisticEntity.builder(StatisticEntity.getStatisticType(metadataObject.type()))
+                .withId(idGenerator.nextId())
+                .withName(name)
+                .withValue(value)
+                .withNamespace(Namespace.fromString(identifier.toString()))
+                .withAuditInfo(
+                    AuditInfo.builder()
+                        
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withCreateTime(Instant.now())
+                        
.withLastModifier(PrincipalUtils.getCurrentPrincipal().getName())
+                        .withLastModifiedTime(Instant.now())
+                        .build())
+                .build();
+        statisticEntities.add(statistic);
+      }
+      TreeLockUtils.doWithTreeLock(
+          identifier,
+          LockType.WRITE,
+          (Executable<Void, IOException>)
+              () -> {
+                store.batchPut(statisticEntities, true);
+                return null;
+              });
+
+    } catch (NoSuchEntityException nse) {
+      LOG.warn(
+          "Failed to update statistics for metadata object {} in the metalake 
{}: {}",
+          metadataObject.fullName(),
+          metalake,
+          nse.getMessage());
+      throw new NoSuchMetadataObjectException(
+          "The metadata object %s in the metalake %s isn't found",
+          metadataObject.fullName(), metalake);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  public boolean dropStatistics(
+      String metalake, MetadataObject metadataObject, List<String> statistics)
+      throws UnmodifiableStatisticException {
+    try {
+      NameIdentifier identifier = MetadataObjectUtil.toEntityIdent(metalake, 
metadataObject);
+      Entity.EntityType type = 
StatisticEntity.getStatisticType(metadataObject.type());
+      List<Pair<NameIdentifier, Entity.EntityType>> idents = 
Lists.newArrayList();
+
+      for (String statistic : statistics) {
+        Pair<NameIdentifier, Entity.EntityType> pair =
+            Pair.of(NameIdentifierUtil.ofStatistic(identifier, statistic), 
type);
+        idents.add(pair);
+      }
+      int deleteCount =
+          TreeLockUtils.doWithTreeLock(
+              identifier, LockType.WRITE, () -> store.batchDelete(idents, 
true));
+      // If deleteCount is 0, it means that the statistics were not found.
+      return deleteCount != 0;
+    } catch (NoSuchEntityException nse) {
+      LOG.warn(
+          "Failed to drop statistics for metadata object {} in the metalake {} 
: {}",
+          metadataObject.fullName(),
+          metalake,
+          nse.getMessage());
+      throw new NoSuchMetadataObjectException(
+          "The metadata object %s in the metalake %s isn't found",
+          metadataObject.fullName(), metalake);
+    } catch (IOException ioe) {
+      LOG.error(
+          "Failed to drop statistics for metadata object {} in the metalake {} 
: {}",
+          metadataObject.fullName(),
+          metalake,
+          ioe.getMessage());
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private static class CustomStatistic implements Statistic {
+
+    private final String name;
+    private final StatisticValue<?> value;
+
+    CustomStatistic(String name, StatisticValue<?> value) {
+      this.name = name;
+      this.value = value;
+    }
+
+    @Override
+    public String name() {
+      return name;
+    }
+
+    @Override
+    public Optional<StatisticValue<?>> value() {
+      return Optional.of(value);
+    }
+
+    @Override
+    public boolean reserved() {
+      return false;
+    }
+
+    @Override
+    public boolean modifiable() {
+      return true;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index a72182d5df..f1168d493f 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -20,13 +20,17 @@
 package org.apache.gravitino.storage.relational;
 
 import static 
org.apache.gravitino.Configs.GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT;
+import static org.apache.gravitino.Entity.EntityType.TABLE;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
@@ -49,6 +53,7 @@ import org.apache.gravitino.meta.ModelVersionEntity;
 import org.apache.gravitino.meta.PolicyEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.StatisticEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.TopicEntity;
@@ -138,6 +143,11 @@ public class JDBCBackend implements RelationalBackend {
             
JobTemplateMetaService.getInstance().listJobTemplatesByNamespace(namespace);
       case JOB:
         return (List<E>) 
JobMetaService.getInstance().listJobsByNamespace(namespace);
+      case TABLE_STATISTIC:
+        return (List<E>)
+            StatisticMetaService.getInstance()
+                .listStatisticsByEntity(
+                    NameIdentifier.parse(namespace.toString()), 
Entity.EntityType.TABLE);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -375,7 +385,7 @@ public class JDBCBackend implements RelationalBackend {
         return ModelVersionMetaService.getInstance()
             .deleteModelVersionMetasByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
-      case STATISTIC:
+      case TABLE_STATISTIC:
         return StatisticMetaService.getInstance()
             .deleteStatisticsByLegacyTimeline(
                 legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
@@ -413,7 +423,7 @@ public class JDBCBackend implements RelationalBackend {
       case TAG:
       case MODEL:
       case MODEL_VERSION:
-      case STATISTIC:
+      case TABLE_STATISTIC:
       case JOB_TEMPLATE:
       case JOB:
         // These entity types have not implemented multi-versions, so we can 
skip.
@@ -476,6 +486,76 @@ public class JDBCBackend implements RelationalBackend {
         .associateTagsWithMetadataObject(objectIdent, objectType, tagsToAdd, 
tagsToRemove);
   }
 
+  @Override
+  public int batchDelete(
+      List<Pair<NameIdentifier, Entity.EntityType>> entitiesToDelete, boolean 
cascade)
+      throws IOException {
+    if (entitiesToDelete == null || entitiesToDelete.isEmpty()) {
+      return 0;
+    }
+    Preconditions.checkArgument(
+        1 == 
entitiesToDelete.stream().collect(Collectors.groupingBy(Pair::getRight)).size(),
+        "All entities must be of the same type for batch delete operation.");
+    Entity.EntityType entityType = entitiesToDelete.get(0).getRight();
+    switch (entityType) {
+      case TABLE_STATISTIC:
+        Preconditions.checkArgument(
+            cascade, "Batch delete for statistics must be cascade deleted.");
+        List<NameIdentifier> deleteIdents =
+            
entitiesToDelete.stream().map(Pair::getLeft).collect(Collectors.toList());
+        int namespaceSize =
+            entitiesToDelete.stream()
+                .collect(Collectors.groupingBy(ident -> 
ident.getLeft().namespace()))
+                .size();
+        Preconditions.checkArgument(
+            1 == namespaceSize,
+            "All entities must be in the same namespace for batch delete 
operation.");
+
+        Namespace namespace = deleteIdents.get(0).namespace();
+        return StatisticMetaService.getInstance()
+            .batchDeleteStatisticPOs(
+                NameIdentifier.parse(namespace.toString()),
+                TABLE,
+                
deleteIdents.stream().map(NameIdentifier::name).collect(Collectors.toList()));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Batch delete is not supported for entity type %s", 
entityType.name()));
+    }
+  }
+
+  @Override
+  public <E extends Entity & HasIdentifier> void batchPut(List<E> entities, 
boolean overwritten)
+      throws IOException, EntityAlreadyExistsException {
+    if (entities.isEmpty()) {
+      return;
+    }
+
+    Preconditions.checkArgument(
+        1 == 
entities.stream().collect(Collectors.groupingBy(Entity::type)).size(),
+        "All entities must be of the same type for batchPut operation.");
+    Entity.EntityType entityType = entities.get(0).type();
+
+    switch (entityType) {
+      case TABLE_STATISTIC:
+        Preconditions.checkArgument(overwritten, "Batch put for statistics 
must be overwritten.");
+        List<StatisticEntity> statisticEntities =
+            entities.stream().map(e -> (StatisticEntity) 
e).collect(Collectors.toList());
+        Preconditions.checkArgument(
+            1 == 
entities.stream().collect(Collectors.groupingBy(HasIdentifier::namespace)).size(),
+            "All entities must be in the same namespace for batchPut 
operation.");
+
+        StatisticMetaService.getInstance()
+            .batchInsertStatisticPOsOnDuplicateKeyUpdate(
+                statisticEntities,
+                
NameIdentifier.parse(statisticEntities.get(0).namespace().toString()),
+                Entity.EntityType.TABLE);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            String.format("Batch put is not supported for entity type %s", 
entityType.name()));
+    }
+  }
+
   @Override
   public <E extends Entity & HasIdentifier> List<E> listEntitiesByRelation(
       Type relType, NameIdentifier nameIdentifier, Entity.EntityType 
identType, boolean allFields)
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
index 95a58e86fd..42c21e9a5d 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalBackend.java
@@ -22,6 +22,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.EntityAlreadyExistsException;
@@ -126,6 +127,29 @@ public interface RelationalBackend
   boolean delete(NameIdentifier ident, Entity.EntityType entityType, boolean 
cascade)
       throws IOException;
 
+  /**
+   * Deletes the entities in the specified namespace and entity type.
+   *
+   * @param entitiesToDelete The list of identifiers and their entity types to 
be deleted.
+   * @param cascade True, If you need to cascade delete entities, else false.
+   * @return The count of the deleted entities.
+   * @throws IOException If the store operation fails
+   */
+  int batchDelete(List<Pair<NameIdentifier, Entity.EntityType>> 
entitiesToDelete, boolean cascade)
+      throws IOException;
+
+  /**
+   * Stores a batch of entities, possibly overwriting existing entities if 
specified.
+   *
+   * @param entities The list of entities to be stored.
+   * @param overwritten If true, overwrites existing entities with the same 
identifier.
+   * @param <E> The type of the entities in the list.
+   * @throws IOException If the store operation fails
+   * @throws EntityAlreadyExistsException If an entity already exists and 
overwrite is false.
+   */
+  <E extends Entity & HasIdentifier> void batchPut(List<E> entities, boolean 
overwritten)
+      throws IOException, EntityAlreadyExistsException;
+
   /**
    * Permanently deletes the legacy data that has been marked as deleted 
before the given legacy
    * timeline.
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index 9ef62d656a..1a71fb0070 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
 import org.apache.gravitino.Entity;
@@ -260,4 +261,17 @@ public class RelationalEntityStore
     return backend.updateEntityRelations(
         relType, srcEntityIdent, srcEntityType, destEntitiesToAdd, 
destEntitiesToRemove);
   }
+
+  @Override
+  public int batchDelete(
+      List<Pair<NameIdentifier, Entity.EntityType>> entitiesToDelete, boolean 
cascade)
+      throws IOException {
+    return backend.batchDelete(entitiesToDelete, cascade);
+  }
+
+  @Override
+  public <E extends Entity & HasIdentifier> void batchPut(List<E> entities, 
boolean overwritten)
+      throws IOException, EntityAlreadyExistsException {
+    backend.batchPut(entities, overwritten);
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
index 51dd1e0066..fa3de359b2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/StatisticPO.java
@@ -57,7 +57,9 @@ public class StatisticPO {
 
   public static StatisticEntity fromStatisticPO(StatisticPO statisticPO) {
     try {
-      return StatisticEntity.builder()
+      return StatisticEntity.builder(
+              StatisticEntity.getStatisticType(
+                  MetadataObject.Type.valueOf(statisticPO.metadataObjectType)))
           .withId(statisticPO.getStatisticId())
           .withName(statisticPO.getStatisticName())
           .withValue(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
index 8b92cc5a25..e455a4c3b2 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/StatisticMetaService.java
@@ -59,13 +59,12 @@ public class StatisticMetaService {
   }
 
   public void batchInsertStatisticPOsOnDuplicateKeyUpdate(
-      List<StatisticEntity> statisticEntities,
-      String metalake,
-      NameIdentifier entity,
-      Entity.EntityType type) {
+      List<StatisticEntity> statisticEntities, NameIdentifier entity, 
Entity.EntityType type) {
     if (statisticEntities == null || statisticEntities.isEmpty()) {
       return;
     }
+
+    String metalake = NameIdentifierUtil.getMetalake(entity);
     Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
     MetadataObject object = NameIdentifierUtil.toMetadataObject(entity, type);
     Long entityId =
diff --git 
a/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java 
b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
new file mode 100644
index 0000000000..e430d9486c
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/stats/TestStatisticManager.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.stats;
+
+import static org.apache.gravitino.Configs.DEFAULT_ENTITY_RELATIONAL_STORE;
+import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER;
+import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS;
+import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_URL;
+import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS;
+import static org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE;
+import static org.apache.gravitino.Configs.ENTITY_STORE;
+import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
+import static org.apache.gravitino.Configs.STORE_DELETE_AFTER_TIME;
+import static org.apache.gravitino.Configs.STORE_TRANSACTION_MAX_SKEW_TIME;
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.VERSION_RETENTION_COUNT;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.EntityStoreFactory;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.MetadataObject;
+import org.apache.gravitino.MetadataObjects;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchMetadataObjectException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.apache.gravitino.meta.ColumnEntity;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.meta.SchemaVersion;
+import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.IdGenerator;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestStatisticManager {
+  private static final String JDBC_STORE_PATH =
+      "/tmp/gravitino_jdbc_entityStore_" + 
UUID.randomUUID().toString().replace("-", "");
+
+  private static final String DB_DIR = JDBC_STORE_PATH + "/testdb";
+  private static final String METALAKE = "metalake_for_stat_test";
+
+  private static final String CATALOG = "catalog_for_stat_test";
+
+  private static final String SCHEMA = "schema_for_stat_test";
+
+  private static final String TABLE = "table_for_stat_test";
+
+  private static final String COLUMN = "column_for_stat_test";
+  private static final Config config = Mockito.mock(Config.class);
+
+  private static EntityStore entityStore;
+
+  private static IdGenerator idGenerator;
+
+  @BeforeAll
+  public static void setUp() throws IOException, IllegalAccessException {
+    idGenerator = new RandomIdGenerator();
+
+    File dbDir = new File(DB_DIR);
+    dbDir.mkdirs();
+
+    Mockito.when(config.get(ENTITY_STORE)).thenReturn(RELATIONAL_ENTITY_STORE);
+    
Mockito.when(config.get(ENTITY_RELATIONAL_STORE)).thenReturn(DEFAULT_ENTITY_RELATIONAL_STORE);
+    Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_URL))
+        
.thenReturn(String.format("jdbc:h2:file:%s;DB_CLOSE_DELAY=-1;MODE=MYSQL", 
DB_DIR));
+    
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_DRIVER)).thenReturn("org.h2.Driver");
+    
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_MAX_CONNECTIONS)).thenReturn(100);
+    
Mockito.when(config.get(ENTITY_RELATIONAL_JDBC_BACKEND_WAIT_MILLISECONDS)).thenReturn(1000L);
+    
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);
+    Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(20 * 60 * 
1000L);
+    Mockito.when(config.get(VERSION_RETENTION_COUNT)).thenReturn(1L);
+    // Fix cache config for test
+    Mockito.when(config.get(Configs.CACHE_ENABLED)).thenReturn(true);
+    Mockito.when(config.get(Configs.CACHE_MAX_ENTRIES)).thenReturn(10_000);
+    
Mockito.when(config.get(Configs.CACHE_EXPIRATION_TIME)).thenReturn(3_600_000L);
+    Mockito.when(config.get(Configs.CACHE_WEIGHER_ENABLED)).thenReturn(true);
+    Mockito.when(config.get(Configs.CACHE_STATS_ENABLED)).thenReturn(false);
+    
Mockito.when(config.get(Configs.CACHE_IMPLEMENTATION)).thenReturn("caffeine");
+
+    Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+
+    entityStore = EntityStoreFactory.createEntityStore(config);
+    entityStore.initialize(config);
+
+    AuditInfo audit = 
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+    BaseMetalake metalake =
+        BaseMetalake.builder()
+            .withId(idGenerator.nextId())
+            .withName(METALAKE)
+            .withVersion(SchemaVersion.V_0_1)
+            .withComment("Test metalake")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(metalake, false /* overwritten */);
+
+    CatalogEntity catalog =
+        CatalogEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(CATALOG)
+            .withNamespace(Namespace.of(METALAKE))
+            .withType(Catalog.Type.RELATIONAL)
+            .withProvider("test")
+            .withComment("Test catalog")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(catalog, false /* overwritten */);
+
+    SchemaEntity schema =
+        SchemaEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(SCHEMA)
+            .withNamespace(Namespace.of(METALAKE, CATALOG))
+            .withComment("Test schema")
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(schema, false /* overwritten */);
+
+    ColumnEntity column =
+        ColumnEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(COLUMN)
+            .withPosition(0)
+            .withComment("Test column")
+            .withDataType(Types.IntegerType.get())
+            .withNullable(true)
+            .withAutoIncrement(false)
+            .withAuditInfo(audit)
+            .build();
+
+    TableEntity table =
+        TableEntity.builder()
+            .withId(idGenerator.nextId())
+            .withName(TABLE)
+            .withColumns(Lists.newArrayList(column))
+            .withNamespace(Namespace.of(METALAKE, CATALOG, SCHEMA))
+            .withAuditInfo(audit)
+            .build();
+    entityStore.put(table, false /* overwritten */);
+  }
+
+  @AfterAll
+  public static void tearDown() throws IOException {
+    if (entityStore != null) {
+      entityStore.close();
+      entityStore = null;
+    }
+
+    FileUtils.deleteDirectory(new File(JDBC_STORE_PATH));
+  }
+
+  @Test
+  public void testStatisticLifeCycle() {
+    StatisticManager statisticManager = new StatisticManager(entityStore, 
idGenerator);
+
+    MetadataObject tableObject =
+        MetadataObjects.of(Lists.newArrayList(CATALOG, SCHEMA, TABLE), 
MetadataObject.Type.TABLE);
+    Map<String, StatisticValue<?>> stats = Maps.newHashMap();
+    // Update statistics
+    stats.put("a", StatisticValues.stringValue("1"));
+    stats.put("b", StatisticValues.longValue(1L));
+    stats.put("c", StatisticValues.doubleValue(1.0));
+    stats.put("d", StatisticValues.booleanValue(true));
+    stats.put(
+        "e",
+        StatisticValues.listValue(
+            Lists.newArrayList(
+                StatisticValues.stringValue("1"), 
StatisticValues.stringValue("2"))));
+
+    Map<String, StatisticValue<?>> map = Maps.newHashMap();
+    map.put("x", StatisticValues.stringValue("1"));
+    map.put("y", StatisticValues.longValue(2L));
+    stats.put("f", StatisticValues.objectValue(map));
+    statisticManager.updateStatistics(METALAKE, tableObject, stats);
+
+    List<Statistic> statistics = statisticManager.listStatistics(METALAKE, 
tableObject);
+    Assertions.assertEquals(6, statistics.size());
+    for (Statistic statistic : statistics) {
+      Assertions.assertTrue(
+          stats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = stats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+
+    // Update partial statistics
+    Map<String, StatisticValue<?>> expectStats = Maps.newHashMap();
+    expectStats.putAll(stats);
+    stats.clear();
+    stats.put("f", StatisticValues.longValue(2L));
+    stats.put("x", StatisticValues.longValue(2L));
+
+    expectStats.put("f", StatisticValues.longValue(2L));
+    expectStats.put("x", StatisticValues.longValue(2));
+
+    statisticManager.updateStatistics(METALAKE, tableObject, stats);
+    statistics = statisticManager.listStatistics(METALAKE, tableObject);
+    Assertions.assertEquals(7, statistics.size());
+    for (Statistic statistic : statistics) {
+      Assertions.assertTrue(
+          expectStats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = expectStats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+
+    // Drop statistics
+    expectStats.remove("a");
+    expectStats.remove("b");
+    expectStats.remove("c");
+    List<String> statNames = Lists.newArrayList("a", "b", "c");
+    statisticManager.dropStatistics(METALAKE, tableObject, statNames);
+    statistics = statisticManager.listStatistics(METALAKE, tableObject);
+    Assertions.assertEquals(4, statistics.size());
+
+    for (Statistic statistic : statistics) {
+      Assertions.assertTrue(
+          expectStats.containsKey(statistic.name()),
+          "Statistic name should be in the updated stats: " + 
statistic.name());
+      StatisticValue<?> value = expectStats.get(statistic.name());
+      Assertions.assertEquals(
+          value, statistic.value().get(), "Statistic value type mismatch: " + 
statistic.name());
+    }
+
+    // List not-existed metadata object statistics
+    MetadataObject notExistObject =
+        MetadataObjects.of(
+            Lists.newArrayList(CATALOG, SCHEMA, "not-exist"), 
MetadataObject.Type.TABLE);
+    Assertions.assertThrows(
+        NoSuchMetadataObjectException.class,
+        () -> statisticManager.listStatistics(METALAKE, notExistObject));
+
+    // Update not-existed metadata object statistics
+    Assertions.assertThrows(
+        NoSuchMetadataObjectException.class,
+        () -> statisticManager.updateStatistics(METALAKE, notExistObject, 
stats));
+
+    // Drop statistics
+    Assertions.assertThrows(
+        NoSuchMetadataObjectException.class,
+        () -> statisticManager.dropStatistics(METALAKE, notExistObject, 
statNames));
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
 
b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
index bdb37895d2..725d440dde 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/memory/TestMemoryEntityStore.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Entity;
 import org.apache.gravitino.Entity.EntityType;
@@ -170,6 +171,19 @@ public class TestMemoryEntityStore {
       }
     }
 
+    @Override
+    public int batchDelete(List<Pair<NameIdentifier, EntityType>> 
entitiesToDelete, boolean cascade)
+        throws IOException {
+      throw new UnsupportedOperationException(
+          "Batch delete is not supported in InMemoryEntityStore.");
+    }
+
+    @Override
+    public <E extends Entity & HasIdentifier> void batchPut(List<E> entities, 
boolean overwritten)
+        throws IOException, EntityAlreadyExistsException {
+      throw new UnsupportedOperationException("Batch put is not supported in 
InMemoryEntityStore.");
+    }
+
     @Override
     public void close() throws IOException {
       entityMap.clear();
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
index a45705377d..1ffd191ae6 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestStatisticMetaService.java
@@ -31,6 +31,7 @@ import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.StatisticEntity;
 import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TableStatisticEntity;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.stats.StatisticValues;
 import org.apache.gravitino.storage.RandomIdGenerator;
@@ -76,7 +77,7 @@ public class TestStatisticMetaService extends TestJDBCBackend 
{
     statisticEntities.add(statisticEntity);
 
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+        statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
 
     List<StatisticEntity> listEntities =
         statisticMetaService.listStatisticsByEntity(
@@ -90,7 +91,7 @@ public class TestStatisticMetaService extends TestJDBCBackend 
{
     statisticEntities.clear();
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+        statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
 
     listEntities =
         statisticMetaService.listStatisticsByEntity(
@@ -165,25 +166,25 @@ public class TestStatisticMetaService extends 
TestJDBCBackend {
     StatisticEntity statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+        statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+        statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+        statisticEntities, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+        statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
 
     // assert stats
     Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -278,25 +279,25 @@ public class TestStatisticMetaService extends 
TestJDBCBackend {
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+        statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+        statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+        statisticEntities, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+        statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
 
     // assert stats
     Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -359,25 +360,25 @@ public class TestStatisticMetaService extends 
TestJDBCBackend {
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, table.nameIdentifier(), 
Entity.EntityType.TABLE);
+        statisticEntities, table.nameIdentifier(), Entity.EntityType.TABLE);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, topic.nameIdentifier(), 
Entity.EntityType.TOPIC);
+        statisticEntities, topic.nameIdentifier(), Entity.EntityType.TOPIC);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
+        statisticEntities, fileset.nameIdentifier(), 
Entity.EntityType.FILESET);
 
     statisticEntities.clear();
     statisticEntity = createStatisticEntity(auditInfo, 100L);
     statisticEntities.add(statisticEntity);
     statisticMetaService.batchInsertStatisticPOsOnDuplicateKeyUpdate(
-        statisticEntities, metalakeName, model.nameIdentifier(), 
Entity.EntityType.MODEL);
+        statisticEntities, model.nameIdentifier(), Entity.EntityType.MODEL);
 
     // assert stats count
     Assertions.assertEquals(4, countActiveStats(metalake.id()));
@@ -392,7 +393,7 @@ public class TestStatisticMetaService extends 
TestJDBCBackend {
   }
 
   private static StatisticEntity createStatisticEntity(AuditInfo auditInfo, 
long value) {
-    return StatisticEntity.builder()
+    return TableStatisticEntity.builder()
         .withId(RandomIdGenerator.INSTANCE.nextId())
         .withName("test")
         .withValue(StatisticValues.longValue(value))
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 31211eab23..9d72d31fa9 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -58,6 +58,7 @@ import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.StatisticEntity;
 import org.apache.gravitino.meta.TableEntity;
+import org.apache.gravitino.meta.TableStatisticEntity;
 import org.apache.gravitino.meta.TagEntity;
 import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.policy.Policy;
@@ -1212,7 +1213,7 @@ public class TestPOConverters {
   public void testStatisticPO() throws JsonProcessingException {
     List<StatisticEntity> statisticEntities = Lists.newArrayList();
     statisticEntities.add(
-        StatisticEntity.builder()
+        TableStatisticEntity.builder()
             .withId(1L)
             .withName("test_statistic")
             .withNamespace(
@@ -1241,7 +1242,7 @@ public class TestPOConverters {
             .withStatisticName("test")
             .withStatisticValue("\"test\"")
             .withMetadataObjectId(1L)
-            .withMetadataObjectType("CATALOG")
+            .withMetadataObjectType("TABLE")
             .withDeletedAt(0L)
             .withMetalakeId(1L)
             .withAuditInfo(

Reply via email to