This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 74ecaeb93 [#5602] feat(core): Add storage schema for model (part-1) 
(#5689)
74ecaeb93 is described below

commit 74ecaeb933e931db67f471c580dfcaef5329be73
Author: Jerry Shao <jerrys...@datastrato.com>
AuthorDate: Mon Dec 2 10:45:38 2024 +0800

    [#5602] feat(core): Add storage schema for model (part-1) (#5689)
    
    ### What changes were proposed in this pull request?
    
    This PR adds the 1st part of storage schema for model metadata.
    
    ### Why are the changes needed?
    
    Fix: #5602
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Add various UTs to cover the code.
---
 .../gravitino/storage/relational/JDBCBackend.java  |  15 +-
 .../storage/relational/mapper/ModelMetaMapper.java |  84 +++++++++
 .../mapper/ModelMetaSQLProviderFactory.java        | 100 ++++++++++
 .../provider/base/ModelMetaBaseSQLProvider.java    | 137 ++++++++++++++
 .../postgresql/ModelMetaPostgreSQLProvider.java    |  86 +++++++++
 .../gravitino/storage/relational/po/ModelPO.java   | 129 +++++++++++++
 .../relational/po/ModelVersionAliasRelPO.java      |  83 +++++++++
 .../storage/relational/po/ModelVersionPO.java      | 130 +++++++++++++
 .../relational/service/CatalogMetaService.java     |   7 +-
 .../relational/service/MetadataObjectService.java  |  18 ++
 .../relational/service/MetalakeMetaService.java    |   7 +-
 .../relational/service/ModelMetaService.java       | 189 +++++++++++++++++++
 .../relational/service/SchemaMetaService.java      |  20 +-
 .../session/SqlSessionFactoryHelper.java           |   2 +
 .../storage/relational/utils/POConverters.java     |  37 ++++
 .../gravitino/storage/TestEntityStorage.java       | 194 ++++++++++++++++++--
 .../storage/relational/TestJDBCBackend.java        |  89 +++++++++
 .../relational/service/TestModelMetaService.java   | 202 +++++++++++++++++++++
 .../service/TestTableColumnMetaService.java        |  36 +---
 .../storage/relational/utils/TestPOConverters.java | 156 ++++++++++++++++
 scripts/h2/schema-0.8.0-h2.sql                     |  47 +++++
 scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql           |  47 +++++
 scripts/mysql/schema-0.8.0-mysql.sql               |  47 +++++
 scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql     |  47 +++++
 scripts/postgresql/schema-0.8.0-postgresql.sql     |  84 +++++++++
 .../upgrade-0.7.0-to-0.8.0-postgresql.sql          |  84 +++++++++
 26 files changed, 2026 insertions(+), 51 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index f45778e97..961257808 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -42,6 +42,7 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
@@ -54,6 +55,7 @@ import 
org.apache.gravitino.storage.relational.service.CatalogMetaService;
 import org.apache.gravitino.storage.relational.service.FilesetMetaService;
 import org.apache.gravitino.storage.relational.service.GroupMetaService;
 import org.apache.gravitino.storage.relational.service.MetalakeMetaService;
+import org.apache.gravitino.storage.relational.service.ModelMetaService;
 import org.apache.gravitino.storage.relational.service.OwnerMetaService;
 import org.apache.gravitino.storage.relational.service.RoleMetaService;
 import org.apache.gravitino.storage.relational.service.SchemaMetaService;
@@ -111,6 +113,8 @@ public class JDBCBackend implements RelationalBackend {
         return (List<E>) 
RoleMetaService.getInstance().listRolesByNamespace(namespace);
       case GROUP:
         return (List<E>) 
GroupMetaService.getInstance().listGroupsByNamespace(namespace, allFields);
+      case MODEL:
+        return (List<E>) 
ModelMetaService.getInstance().listModelsByNamespace(namespace);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for list operation", entityType);
@@ -150,6 +154,8 @@ public class JDBCBackend implements RelationalBackend {
       GroupMetaService.getInstance().insertGroup((GroupEntity) e, overwritten);
     } else if (e instanceof TagEntity) {
       TagMetaService.getInstance().insertTag((TagEntity) e, overwritten);
+    } else if (e instanceof ModelEntity) {
+      ModelMetaService.getInstance().insertModel((ModelEntity) e, overwritten);
     } else {
       throw new UnsupportedEntityTypeException(
           "Unsupported entity type: %s for insert operation", e.getClass());
@@ -212,6 +218,8 @@ public class JDBCBackend implements RelationalBackend {
         return (E) RoleMetaService.getInstance().getRoleByIdentifier(ident);
       case TAG:
         return (E) TagMetaService.getInstance().getTagByIdentifier(ident);
+      case MODEL:
+        return (E) ModelMetaService.getInstance().getModelByIdentifier(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for get operation", entityType);
@@ -242,6 +250,8 @@ public class JDBCBackend implements RelationalBackend {
         return RoleMetaService.getInstance().deleteRole(ident);
       case TAG:
         return TagMetaService.getInstance().deleteTag(ident);
+      case MODEL:
+        return ModelMetaService.getInstance().deleteModel(ident);
       default:
         throw new UnsupportedEntityTypeException(
             "Unsupported entity type: %s for delete operation", entityType);
@@ -295,10 +305,13 @@ public class JDBCBackend implements RelationalBackend {
       case COLUMN:
         return TableColumnMetaService.getInstance()
             .deleteColumnsByLegacyTimeline(legacyTimeline, 
GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
-      case AUDIT:
       case MODEL:
+        return ModelMetaService.getInstance()
+            .deleteModelMetasByLegacyTimeline(
+                legacyTimeline, GARBAGE_COLLECTOR_SINGLE_DELETION_LIMIT);
       case MODEL_VERSION:
         // TODO (jerryshao): Implement hard delete logic for these entity 
types.
+      case AUDIT:
         return 0;
         // TODO: Implement hard delete logic for these entity types.
 
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
new file mode 100644
index 000000000..5b3c4a93f
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaMapper.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import java.util.List;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.ibatis.annotations.DeleteProvider;
+import org.apache.ibatis.annotations.InsertProvider;
+import org.apache.ibatis.annotations.Param;
+import org.apache.ibatis.annotations.SelectProvider;
+import org.apache.ibatis.annotations.UpdateProvider;
+
+public interface ModelMetaMapper {
+  String TABLE_NAME = "model_meta";
+
+  @InsertProvider(type = ModelMetaSQLProviderFactory.class, method = 
"insertModelMeta")
+  void insertModelMeta(@Param("modelMeta") ModelPO modelPO);
+
+  @InsertProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "insertModelMetaOnDuplicateKeyUpdate")
+  void insertModelMetaOnDuplicateKeyUpdate(@Param("modelMeta") ModelPO 
modelPO);
+
+  @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"listModelPOsBySchemaId")
+  List<ModelPO> listModelPOsBySchemaId(@Param("schemaId") Long schemaId);
+
+  @SelectProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "selectModelMetaBySchemaIdAndModelName")
+  ModelPO selectModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
+
+  @SelectProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "selectModelIdBySchemaIdAndModelName")
+  Long selectModelIdBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
+
+  @SelectProvider(type = ModelMetaSQLProviderFactory.class, method = 
"selectModelMetaByModelId")
+  ModelPO selectModelMetaByModelId(@Param("modelId") Long modelId);
+
+  @UpdateProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "softDeleteModelMetaBySchemaIdAndModelName")
+  Integer softDeleteModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName);
+
+  @UpdateProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "softDeleteModelMetasByCatalogId")
+  Integer softDeleteModelMetasByCatalogId(@Param("catalogId") Long catalogId);
+
+  @UpdateProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "softDeleteModelMetasByMetalakeId")
+  Integer softDeleteModelMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId);
+
+  @UpdateProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "softDeleteModelMetasBySchemaId")
+  Integer softDeleteModelMetasBySchemaId(@Param("schemaId") Long schemaId);
+
+  @DeleteProvider(
+      type = ModelMetaSQLProviderFactory.class,
+      method = "deleteModelMetasByLegacyTimeline")
+  Integer deleteModelMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit);
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
new file mode 100644
index 000000000..74334ec6e
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/ModelMetaSQLProviderFactory.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.gravitino.storage.relational.JDBCBackend.JDBCBackendType;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelMetaBaseSQLProvider;
+import 
org.apache.gravitino.storage.relational.mapper.provider.postgresql.ModelMetaPostgreSQLProvider;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.gravitino.storage.relational.session.SqlSessionFactoryHelper;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelMetaSQLProviderFactory {
+
+  static class ModelMetaMySQLProvider extends ModelMetaBaseSQLProvider {}
+
+  static class ModelMetaH2Provider extends ModelMetaBaseSQLProvider {}
+
+  private static final Map<JDBCBackendType, ModelMetaBaseSQLProvider> 
MODEL_META_SQL_PROVIDER_MAP =
+      ImmutableMap.of(
+          JDBCBackendType.MYSQL, new ModelMetaMySQLProvider(),
+          JDBCBackendType.H2, new ModelMetaH2Provider(),
+          JDBCBackendType.POSTGRESQL, new ModelMetaPostgreSQLProvider());
+
+  public static ModelMetaBaseSQLProvider getProvider() {
+    String databaseId =
+        SqlSessionFactoryHelper.getInstance()
+            .getSqlSessionFactory()
+            .getConfiguration()
+            .getDatabaseId();
+
+    JDBCBackendType jdbcBackendType = JDBCBackendType.fromString(databaseId);
+    return MODEL_META_SQL_PROVIDER_MAP.get(jdbcBackendType);
+  }
+
+  public static String insertModelMeta(@Param("modelMeta") ModelPO modelPO) {
+    return getProvider().insertModelMeta(modelPO);
+  }
+
+  public static String insertModelMetaOnDuplicateKeyUpdate(@Param("modelMeta") 
ModelPO modelPO) {
+    return getProvider().insertModelMetaOnDuplicateKeyUpdate(modelPO);
+  }
+
+  public static String listModelPOsBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return getProvider().listModelPOsBySchemaId(schemaId);
+  }
+
+  public static String selectModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return getProvider().selectModelMetaBySchemaIdAndModelName(schemaId, 
modelName);
+  }
+
+  public static String selectModelIdBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return getProvider().selectModelIdBySchemaIdAndModelName(schemaId, 
modelName);
+  }
+
+  public static String selectModelMetaByModelId(@Param("modelId") Long 
modelId) {
+    return getProvider().selectModelMetaByModelId(modelId);
+  }
+
+  public static String softDeleteModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return getProvider().softDeleteModelMetaBySchemaIdAndModelName(schemaId, 
modelName);
+  }
+
+  public static String softDeleteModelMetasByCatalogId(@Param("catalogId") 
Long catalogId) {
+    return getProvider().softDeleteModelMetasByCatalogId(catalogId);
+  }
+
+  public static String softDeleteModelMetasByMetalakeId(@Param("metalakeId") 
Long metalakeId) {
+    return getProvider().softDeleteModelMetasByMetalakeId(metalakeId);
+  }
+
+  public static String softDeleteModelMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return getProvider().softDeleteModelMetasBySchemaId(schemaId);
+  }
+
+  public static String deleteModelMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return getProvider().deleteModelMetasByLegacyTimeline(legacyTimeline, 
limit);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
new file mode 100644
index 000000000..cae5b2d9d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/base/ModelMetaBaseSQLProvider.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.base;
+
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelMetaBaseSQLProvider {
+
+  public String insertModelMeta(@Param("modelMeta") ModelPO modelPO) {
+    return "INSERT INTO "
+        + ModelMetaMapper.TABLE_NAME
+        + " (model_id, model_name, metalake_id, catalog_id, schema_id,"
+        + " model_comment, model_properties, model_latest_version, audit_info, 
deleted_at)"
+        + " VALUES (#{modelMeta.modelId}, #{modelMeta.modelName}, 
#{modelMeta.metalakeId},"
+        + " #{modelMeta.catalogId}, #{modelMeta.schemaId}, 
#{modelMeta.modelComment},"
+        + " #{modelMeta.modelProperties}, #{modelMeta.modelLatestVersion}, 
#{modelMeta.auditInfo},"
+        + " #{modelMeta.deletedAt})";
+  }
+
+  public String insertModelMetaOnDuplicateKeyUpdate(@Param("modelMeta") 
ModelPO modelPO) {
+    return "INSERT INTO "
+        + ModelMetaMapper.TABLE_NAME
+        + " (model_id, model_name, metalake_id, catalog_id, schema_id,"
+        + " model_comment, model_properties, model_latest_version, audit_info, 
deleted_at)"
+        + " VALUES (#{modelMeta.modelId}, #{modelMeta.modelName}, 
#{modelMeta.metalakeId},"
+        + " #{modelMeta.catalogId}, #{modelMeta.schemaId}, 
#{modelMeta.modelComment},"
+        + " #{modelMeta.modelProperties}, #{modelMeta.modelLatestVersion}, 
#{modelMeta.auditInfo},"
+        + " #{modelMeta.deletedAt})"
+        + " ON DUPLICATE KEY UPDATE"
+        + " model_name = #{modelMeta.modelName},"
+        + " metalake_id = #{modelMeta.metalakeId},"
+        + " catalog_id = #{modelMeta.catalogId},"
+        + " schema_id = #{modelMeta.schemaId},"
+        + " model_comment = #{modelMeta.modelComment},"
+        + " model_properties = #{modelMeta.modelProperties},"
+        + " model_latest_version = #{modelMeta.modelLatestVersion},"
+        + " audit_info = #{modelMeta.auditInfo},"
+        + " deleted_at = #{modelMeta.deletedAt}";
+  }
+
+  public String listModelPOsBySchemaId(@Param("schemaId") Long schemaId) {
+    return "SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
+        + " catalog_id AS catalogId, schema_id AS schemaId, model_comment AS 
modelComment,"
+        + " model_properties AS modelProperties, model_latest_version AS"
+        + " modelLatestVersion, audit_info AS auditInfo, deleted_at AS 
deletedAt"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  public String selectModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
+        + " catalog_id AS catalogId, schema_id AS schemaId, model_comment AS 
modelComment,"
+        + " model_properties AS modelProperties, model_latest_version AS"
+        + " modelLatestVersion, audit_info AS auditInfo, deleted_at AS 
deletedAt"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND model_name = #{modelName} AND 
deleted_at = 0";
+  }
+
+  public String selectModelIdBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "SELECT model_id"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE schema_id = #{schemaId} AND model_name = #{modelName} AND 
deleted_at = 0";
+  }
+
+  public String selectModelMetaByModelId(@Param("modelId") Long modelId) {
+    return "SELECT model_id AS modelId, model_name AS modelName, metalake_id 
AS metalakeId,"
+        + " catalog_id AS catalogId, schema_id AS schemaId, model_comment AS 
modelComment,"
+        + " model_properties AS modelProperties, model_latest_version AS "
+        + " modelLatestVersion, audit_info AS auditInfo, deleted_at AS 
deletedAt"
+        + " FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE model_id = #{modelId} AND deleted_at = 0";
+  }
+
+  public String softDeleteModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE schema_id = #{schemaId} AND model_name = #{modelName} AND 
deleted_at = 0";
+  }
+
+  public String softDeleteModelMetasByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  public String softDeleteModelMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  public String softDeleteModelMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = (UNIX_TIMESTAMP() * 1000.0)"
+        + " + EXTRACT(MICROSECOND FROM CURRENT_TIMESTAMP(3)) / 1000"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+
+  public String deleteModelMetasByLegacyTimeline(
+      @Param("legacyTimeline") Long legacyTimeline, @Param("limit") int limit) 
{
+    return "DELETE FROM "
+        + ModelMetaMapper.TABLE_NAME
+        + " WHERE deleted_at > 0 AND deleted_at < #{legacyTimeline} LIMIT 
#{limit}";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelMetaPostgreSQLProvider.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelMetaPostgreSQLProvider.java
new file mode 100644
index 000000000..8f62252aa
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/mapper/provider/postgresql/ModelMetaPostgreSQLProvider.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.mapper.provider.postgresql;
+
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import 
org.apache.gravitino.storage.relational.mapper.provider.base.ModelMetaBaseSQLProvider;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.ibatis.annotations.Param;
+
+public class ModelMetaPostgreSQLProvider extends ModelMetaBaseSQLProvider {
+
+  @Override
+  public String insertModelMetaOnDuplicateKeyUpdate(@Param("modelMeta") 
ModelPO modelPO) {
+    return "INSERT INTO "
+        + ModelMetaMapper.TABLE_NAME
+        + "(model_id, model_name, metalake_id, catalog_id, schema_id,"
+        + " model_comment, model_properties, model_latest_version, audit_info, 
deleted_at)"
+        + " VALUES (#{modelMeta.modelId}, #{modelMeta.modelName}, 
#{modelMeta.metalakeId},"
+        + " #{modelMeta.catalogId}, #{modelMeta.schemaId}, 
#{modelMeta.modelComment},"
+        + " #{modelMeta.modelProperties}, #{modelMeta.modelLatestVersion}, 
#{modelMeta.auditInfo},"
+        + " #{modelMeta.deletedAt})"
+        + " ON CONFLICT (model_id) DO UPDATE SET"
+        + " model_name = #{modelMeta.modelName},"
+        + " metalake_id = #{modelMeta.metalakeId},"
+        + " catalog_id = #{modelMeta.catalogId},"
+        + " schema_id = #{modelMeta.schemaId},"
+        + " model_comment = #{modelMeta.modelComment},"
+        + " model_properties = #{modelMeta.modelProperties},"
+        + " model_latest_version = #{modelMeta.modelLatestVersion},"
+        + " audit_info = #{modelMeta.auditInfo},"
+        + " deleted_at = #{modelMeta.deletedAt}";
+  }
+
+  @Override
+  public String softDeleteModelMetaBySchemaIdAndModelName(
+      @Param("schemaId") Long schemaId, @Param("modelName") String modelName) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND model_name = #{modelName} AND 
deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelMetasByCatalogId(@Param("catalogId") Long 
catalogId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE catalog_id = #{catalogId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelMetasByMetalakeId(@Param("metalakeId") Long 
metalakeId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE metalake_id = #{metalakeId} AND deleted_at = 0";
+  }
+
+  @Override
+  public String softDeleteModelMetasBySchemaId(@Param("schemaId") Long 
schemaId) {
+    return "UPDATE "
+        + ModelMetaMapper.TABLE_NAME
+        + " SET deleted_at = floor(extract(epoch from((current_timestamp -"
+        + " timestamp '1970-01-01 00:00:00')*1000)))"
+        + " WHERE schema_id = #{schemaId} AND deleted_at = 0";
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelPO.java 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelPO.java
new file mode 100644
index 000000000..008b071d8
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelPO.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@EqualsAndHashCode
+@Getter
+public class ModelPO {
+
+  private Long modelId;
+
+  private String modelName;
+
+  private Long metalakeId;
+
+  private Long catalogId;
+
+  private Long schemaId;
+
+  private String modelComment;
+
+  private Integer modelLatestVersion;
+
+  private String modelProperties;
+
+  private String auditInfo;
+
+  private Long deletedAt;
+
+  private ModelPO() {}
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final ModelPO modelPO;
+
+    private Builder() {
+      modelPO = new ModelPO();
+    }
+
+    public Builder withModelId(Long modelId) {
+      modelPO.modelId = modelId;
+      return this;
+    }
+
+    public Builder withModelName(String modelName) {
+      modelPO.modelName = modelName;
+      return this;
+    }
+
+    public Builder withMetalakeId(Long metalakeId) {
+      modelPO.metalakeId = metalakeId;
+      return this;
+    }
+
+    public Builder withCatalogId(Long catalogId) {
+      modelPO.catalogId = catalogId;
+      return this;
+    }
+
+    public Builder withSchemaId(Long schemaId) {
+      modelPO.schemaId = schemaId;
+      return this;
+    }
+
+    public Builder withModelComment(String modelComment) {
+      modelPO.modelComment = modelComment;
+      return this;
+    }
+
+    public Builder withModelLatestVersion(Integer modelLatestVersion) {
+      modelPO.modelLatestVersion = modelLatestVersion;
+      return this;
+    }
+
+    public Builder withModelProperties(String modelProperties) {
+      modelPO.modelProperties = modelProperties;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      modelPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      modelPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    public ModelPO build() {
+      Preconditions.checkArgument(modelPO.modelId != null, "Model id is 
required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(modelPO.modelName), "Model name cannot be 
empty");
+      Preconditions.checkArgument(modelPO.metalakeId != null, "Metalake id is 
required");
+      Preconditions.checkArgument(modelPO.catalogId != null, "Catalog id is 
required");
+      Preconditions.checkArgument(modelPO.schemaId != null, "Schema id is 
required");
+      Preconditions.checkArgument(
+          modelPO.modelLatestVersion != null, "Model latest version is 
required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(modelPO.auditInfo), "Audit info cannot be 
empty");
+      Preconditions.checkArgument(modelPO.deletedAt != null, "Deleted at is 
required");
+      return modelPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
new file mode 100644
index 000000000..fc7896b25
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionAliasRelPO.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@EqualsAndHashCode
+@Getter
+public class ModelVersionAliasRelPO {
+
+  private Long modelId;
+
+  private Integer modelVersion;
+
+  private String modelAlias;
+
+  private Long deletedAt;
+
+  private ModelVersionAliasRelPO() {}
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final ModelVersionAliasRelPO modelVersionAliasRelPO;
+
+    private Builder() {
+      modelVersionAliasRelPO = new ModelVersionAliasRelPO();
+    }
+
+    public Builder withModelId(Long modelId) {
+      modelVersionAliasRelPO.modelId = modelId;
+      return this;
+    }
+
+    public Builder withModelVersion(Integer modelVersion) {
+      modelVersionAliasRelPO.modelVersion = modelVersion;
+      return this;
+    }
+
+    public Builder withModelAlias(String modelAlias) {
+      modelVersionAliasRelPO.modelAlias = modelAlias;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      modelVersionAliasRelPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    public ModelVersionAliasRelPO build() {
+      Preconditions.checkArgument(modelVersionAliasRelPO.modelId != null, 
"modelId is required");
+      Preconditions.checkArgument(
+          modelVersionAliasRelPO.modelVersion != null, "modelVersion is 
required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(modelVersionAliasRelPO.modelAlias), 
"modelAlias is required");
+      Preconditions.checkArgument(
+          modelVersionAliasRelPO.deletedAt != null, "deletedAt is required");
+      return modelVersionAliasRelPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
new file mode 100644
index 000000000..ac5611e2d
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/po/ModelVersionPO.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.po;
+
+import com.google.common.base.Preconditions;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+@EqualsAndHashCode
+@Getter
+public class ModelVersionPO {
+
+  private Long modelId;
+
+  private Long metalakeId;
+
+  private Long catalogId;
+
+  private Long schemaId;
+
+  private Integer modelVersion;
+
+  private String modelVersionComment;
+
+  private String modelVersionProperties;
+
+  private String modelVersionUri;
+
+  private String auditInfo;
+
+  private Long deletedAt;
+
+  private ModelVersionPO() {}
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final ModelVersionPO modelVersionPO;
+
+    private Builder() {
+      modelVersionPO = new ModelVersionPO();
+    }
+
+    public Builder withModelId(Long modelId) {
+      modelVersionPO.modelId = modelId;
+      return this;
+    }
+
+    public Builder withMetalakeId(Long metalakeId) {
+      modelVersionPO.metalakeId = metalakeId;
+      return this;
+    }
+
+    public Builder withCatalogId(Long catalogId) {
+      modelVersionPO.catalogId = catalogId;
+      return this;
+    }
+
+    public Builder withSchemaId(Long schemaId) {
+      modelVersionPO.schemaId = schemaId;
+      return this;
+    }
+
+    public Builder withModelVersion(Integer modelVersion) {
+      modelVersionPO.modelVersion = modelVersion;
+      return this;
+    }
+
+    public Builder withModelVersionComment(String modelVersionComment) {
+      modelVersionPO.modelVersionComment = modelVersionComment;
+      return this;
+    }
+
+    public Builder withModelVersionProperties(String modelVersionProperties) {
+      modelVersionPO.modelVersionProperties = modelVersionProperties;
+      return this;
+    }
+
+    public Builder withModelVersionUri(String modelVersionUri) {
+      modelVersionPO.modelVersionUri = modelVersionUri;
+      return this;
+    }
+
+    public Builder withAuditInfo(String auditInfo) {
+      modelVersionPO.auditInfo = auditInfo;
+      return this;
+    }
+
+    public Builder withDeletedAt(Long deletedAt) {
+      modelVersionPO.deletedAt = deletedAt;
+      return this;
+    }
+
+    public ModelVersionPO build() {
+      Preconditions.checkArgument(modelVersionPO.modelId != null, "Model id is 
required");
+      Preconditions.checkArgument(modelVersionPO.metalakeId != null, "Metalake 
id is required");
+      Preconditions.checkArgument(modelVersionPO.catalogId != null, "Catalog 
id is required");
+      Preconditions.checkArgument(modelVersionPO.schemaId != null, "Schema id 
is required");
+      Preconditions.checkArgument(modelVersionPO.modelVersion != null, "Model 
version is required");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(modelVersionPO.modelVersionUri),
+          "Model version uri cannot be empty");
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(modelVersionPO.auditInfo), "Audit info cannot 
be empty");
+      Preconditions.checkArgument(modelVersionPO.deletedAt != null, "Deleted 
at is required");
+
+      return modelVersionPO;
+    }
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
index 15f1d1a3c..0dcf0280c 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/CatalogMetaService.java
@@ -36,6 +36,7 @@ import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.storage.relational.mapper.CatalogMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -240,7 +241,11 @@ public class CatalogMetaService {
           () ->
               SessionUtils.doWithoutCommit(
                   TagMetadataObjectRelMapper.class,
-                  mapper -> 
mapper.softDeleteTagMetadataObjectRelsByCatalogId(catalogId)));
+                  mapper -> 
mapper.softDeleteTagMetadataObjectRelsByCatalogId(catalogId)),
+          () ->
+              SessionUtils.doWithoutCommit(
+                  ModelMetaMapper.class,
+                  mapper -> 
mapper.softDeleteModelMetasByCatalogId(catalogId)));
     } else {
       List<SchemaEntity> schemaEntities =
           SchemaMetaService.getInstance()
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index c32759af5..9834bafa0 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -27,6 +27,7 @@ import org.apache.gravitino.storage.relational.po.CatalogPO;
 import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
+import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
 import org.apache.gravitino.storage.relational.po.TopicPO;
@@ -70,6 +71,9 @@ public class MetadataObjectService {
       return 
FilesetMetaService.getInstance().getFilesetIdBySchemaIdAndName(schemaId, 
names.get(2));
     } else if (type == MetadataObject.Type.TOPIC) {
       return 
TopicMetaService.getInstance().getTopicIdBySchemaIdAndName(schemaId, 
names.get(2));
+    } else if (type == MetadataObject.Type.MODEL) {
+      return ModelMetaService.getInstance()
+          .getModelIdBySchemaIdAndModelName(schemaId, names.get(2));
     }
 
     long tableId =
@@ -174,6 +178,20 @@ public class MetadataObjectService {
           }
           break;
 
+        case MODEL:
+          ModelPO modelPO = 
ModelMetaService.getInstance().getModelPOById(objectId);
+          if (modelPO != null) {
+            fullName =
+                fullName != null
+                    ? DOT_JOINER.join(modelPO.getModelName(), fullName)
+                    : modelPO.getModelName();
+            objectId = modelPO.getSchemaId();
+            metadataType = MetadataObject.Type.SCHEMA;
+          } else {
+            return null;
+          }
+          break;
+
         case COLUMN:
           ColumnPO columnPO = 
TableColumnMetaService.getInstance().getColumnPOById(objectId);
           if (columnPO != null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
index cf05af812..8fa94d4d7 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetalakeMetaService.java
@@ -37,6 +37,7 @@ import 
org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -243,7 +244,11 @@ public class MetalakeMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     OwnerMetaMapper.class,
-                    mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)));
+                    mapper -> 
mapper.softDeleteOwnerRelByMetalakeId(metalakeId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelMetaMapper.class,
+                    mapper -> 
mapper.softDeleteModelMetasByMetalakeId(metalakeId)));
       } else {
         List<CatalogEntity> catalogEntities =
             CatalogMetaService.getInstance()
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
new file mode 100644
index 000000000..2cb16bd07
--- /dev/null
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ModelMetaService.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.storage.relational.service;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.gravitino.storage.relational.utils.ExceptionUtils;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.apache.gravitino.storage.relational.utils.SessionUtils;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ModelMetaService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ModelMetaService.class);
+
+  private static final ModelMetaService INSTANCE = new ModelMetaService();
+
+  public static ModelMetaService getInstance() {
+    return INSTANCE;
+  }
+
+  private ModelMetaService() {}
+
+  public List<ModelEntity> listModelsByNamespace(Namespace ns) {
+    NamespaceUtil.checkModel(ns);
+
+    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ns);
+
+    List<ModelPO> modelPOs =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class, mapper -> 
mapper.listModelPOsBySchemaId(schemaId));
+
+    return modelPOs.stream().map(m -> POConverters.fromModelPO(m, 
ns)).collect(Collectors.toList());
+  }
+
+  public ModelEntity getModelByIdentifier(NameIdentifier ident) {
+    NameIdentifierUtil.checkModel(ident);
+
+    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+
+    ModelPO modelPO =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class,
+            mapper -> mapper.selectModelMetaBySchemaIdAndModelName(schemaId, 
ident.name()));
+
+    if (modelPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          ident.toString());
+    }
+
+    return POConverters.fromModelPO(modelPO, ident.namespace());
+  }
+
+  public void insertModel(ModelEntity modelEntity, boolean overwrite) throws 
IOException {
+    NameIdentifierUtil.checkModel(modelEntity.nameIdentifier());
+
+    try {
+      ModelPO.Builder builder = ModelPO.builder();
+      fillModelPOBuilderParentEntityId(builder, modelEntity.namespace());
+
+      SessionUtils.doWithCommit(
+          ModelMetaMapper.class,
+          mapper -> {
+            ModelPO po = POConverters.initializeModelPO(modelEntity, builder);
+            if (overwrite) {
+              mapper.insertModelMetaOnDuplicateKeyUpdate(po);
+            } else {
+              mapper.insertModelMeta(po);
+            }
+          });
+    } catch (RuntimeException re) {
+      ExceptionUtils.checkSQLException(
+          re, Entity.EntityType.MODEL, 
modelEntity.nameIdentifier().toString());
+      throw re;
+    }
+  }
+
+  public boolean deleteModel(NameIdentifier ident) {
+    NameIdentifierUtil.checkModel(ident);
+
+    Long schemaId;
+    try {
+      schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(ident.namespace());
+    } catch (NoSuchEntityException e) {
+      LOG.warn("Failed to delete model: {}", ident, e);
+      return false;
+    }
+
+    AtomicInteger modelDeletedCount = new AtomicInteger();
+    SessionUtils.doMultipleWithCommit(
+        () ->
+            modelDeletedCount.set(
+                SessionUtils.doWithoutCommitAndFetchResult(
+                    ModelMetaMapper.class,
+                    mapper ->
+                        
mapper.softDeleteModelMetaBySchemaIdAndModelName(schemaId, ident.name())))
+        // TODO(jerryshao): Add delete model version
+        );
+
+    return modelDeletedCount.get() > 0;
+  }
+
+  public int deleteModelMetasByLegacyTimeline(Long legacyTimeline, int limit) {
+    return SessionUtils.doWithCommitAndFetchResult(
+        ModelMetaMapper.class,
+        mapper -> mapper.deleteModelMetasByLegacyTimeline(legacyTimeline, 
limit));
+  }
+
+  Long getModelIdBySchemaIdAndModelName(Long schemaId, String modelName) {
+    Long modelId =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class,
+            mapper -> mapper.selectModelIdBySchemaIdAndModelName(schemaId, 
modelName));
+
+    if (modelId == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          modelName);
+    }
+
+    return modelId;
+  }
+
+  ModelPO getModelPOById(Long modelId) {
+    ModelPO modelPO =
+        SessionUtils.getWithoutCommit(
+            ModelMetaMapper.class, mapper -> 
mapper.selectModelMetaByModelId(modelId));
+
+    if (modelPO == null) {
+      throw new NoSuchEntityException(
+          NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+          Entity.EntityType.MODEL.name().toLowerCase(Locale.ROOT),
+          modelId.toString());
+    }
+
+    return modelPO;
+  }
+
+  private void fillModelPOBuilderParentEntityId(ModelPO.Builder builder, 
Namespace ns) {
+    NamespaceUtil.checkModel(ns);
+    String metalake = ns.level(0);
+    String catalog = ns.level(1);
+    String schema = ns.level(2);
+
+    Long metalakeId = 
MetalakeMetaService.getInstance().getMetalakeIdByName(metalake);
+    builder.withMetalakeId(metalakeId);
+
+    Long catalogId =
+        
CatalogMetaService.getInstance().getCatalogIdByMetalakeIdAndName(metalakeId, 
catalog);
+    builder.withCatalogId(catalogId);
+
+    Long schemaId =
+        
SchemaMetaService.getInstance().getSchemaIdByCatalogIdAndName(catalogId, 
schema);
+    builder.withSchemaId(schemaId);
+  }
+}
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index 5d3fa1b4f..1229e3165 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -31,10 +31,12 @@ import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NonEmptyEntityException;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.storage.relational.mapper.FilesetMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SecurableObjectMapper;
@@ -226,7 +228,11 @@ public class SchemaMetaService {
             () ->
                 SessionUtils.doWithoutCommit(
                     TagMetadataObjectRelMapper.class,
-                    mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)));
+                    mapper -> 
mapper.softDeleteTagMetadataObjectRelsBySchemaId(schemaId)),
+            () ->
+                SessionUtils.doWithoutCommit(
+                    ModelMetaMapper.class,
+                    mapper -> 
mapper.softDeleteModelMetasBySchemaId(schemaId)));
       } else {
         List<TableEntity> tableEntities =
             TableMetaService.getInstance()
@@ -250,6 +256,18 @@ public class SchemaMetaService {
           throw new NonEmptyEntityException(
               "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
         }
+        List<ModelEntity> modelEntities =
+            ModelMetaService.getInstance()
+                .listModelsByNamespace(
+                    NamespaceUtil.ofModel(
+                        identifier.namespace().level(0),
+                        identifier.namespace().level(1),
+                        schemaName));
+        if (!modelEntities.isEmpty()) {
+          throw new NonEmptyEntityException(
+              "Entity %s has sub-entities, you should remove sub-entities 
first", identifier);
+        }
+
         SessionUtils.doMultipleWithCommit(
             () ->
                 SessionUtils.doWithoutCommit(
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
index 4fe53dba3..8bc7394d4 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/session/SqlSessionFactoryHelper.java
@@ -33,6 +33,7 @@ import 
org.apache.gravitino.storage.relational.mapper.FilesetVersionMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.GroupRoleRelMapper;
 import org.apache.gravitino.storage.relational.mapper.MetalakeMetaMapper;
+import org.apache.gravitino.storage.relational.mapper.ModelMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.OwnerMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.RoleMetaMapper;
 import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
@@ -124,6 +125,7 @@ public class SqlSessionFactoryHelper {
     configuration.addMapper(TagMetaMapper.class);
     configuration.addMapper(TagMetadataObjectRelMapper.class);
     configuration.addMapper(OwnerMetaMapper.class);
+    configuration.addMapper(ModelMetaMapper.class);
 
     // Create the SqlSessionFactory object, it is a singleton object
     if (sqlSessionFactory == null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
index 4cccd0675..0bd0f4a74 100644
--- 
a/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
+++ 
b/core/src/main/java/org/apache/gravitino/storage/relational/utils/POConverters.java
@@ -45,6 +45,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -64,6 +65,7 @@ import 
org.apache.gravitino.storage.relational.po.FilesetVersionPO;
 import org.apache.gravitino.storage.relational.po.GroupPO;
 import org.apache.gravitino.storage.relational.po.GroupRoleRelPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
+import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.RolePO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
@@ -1287,4 +1289,39 @@ public class POConverters {
       throw new RuntimeException("Failed to serialize json object:", e);
     }
   }
+
+  public static ModelEntity fromModelPO(ModelPO modelPO, Namespace namespace) {
+    try {
+      return ModelEntity.builder()
+          .withId(modelPO.getModelId())
+          .withName(modelPO.getModelName())
+          .withNamespace(namespace)
+          .withComment(modelPO.getModelComment())
+          .withLatestVersion(modelPO.getModelLatestVersion())
+          .withProperties(
+              
JsonUtils.anyFieldMapper().readValue(modelPO.getModelProperties(), Map.class))
+          .withAuditInfo(
+              JsonUtils.anyFieldMapper().readValue(modelPO.getAuditInfo(), 
AuditInfo.class))
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to deserialize json object:", e);
+    }
+  }
+
+  public static ModelPO initializeModelPO(ModelEntity modelEntity, 
ModelPO.Builder builder) {
+    try {
+      return builder
+          .withModelId(modelEntity.id())
+          .withModelName(modelEntity.name())
+          .withModelComment(modelEntity.comment())
+          .withModelLatestVersion(modelEntity.latestVersion())
+          .withModelProperties(
+              
JsonUtils.anyFieldMapper().writeValueAsString(modelEntity.properties()))
+          
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(modelEntity.auditInfo()))
+          .withDeletedAt(DEFAULT_DELETED_AT)
+          .build();
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize json object:", e);
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java 
b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
index 01d919676..b0f516e50 100644
--- a/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
+++ b/core/src/test/java/org/apache/gravitino/storage/TestEntityStorage.java
@@ -73,6 +73,7 @@ import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -81,6 +82,7 @@ import org.apache.gravitino.meta.TopicEntity;
 import org.apache.gravitino.meta.UserEntity;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
 import org.apache.gravitino.storage.relational.converters.H2ExceptionConverter;
 import 
org.apache.gravitino.storage.relational.converters.MySQLExceptionConverter;
 import 
org.apache.gravitino.storage.relational.converters.PostgreSQLExceptionConverter;
@@ -272,6 +274,15 @@ public class TestEntityStorage {
               Namespace.of("metalake", "catalog", "schema1"),
               "topic1",
               auditInfo);
+      ModelEntity model1 =
+          TestJDBCBackend.createModelEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              Namespace.of("metalake", "catalog", "schema1"),
+              "model1",
+              "model1",
+              1,
+              null,
+              auditInfo);
       UserEntity user1 =
           createUser(RandomIdGenerator.INSTANCE.nextId(), "metalake", "user1", 
auditInfo);
       GroupEntity group1 =
@@ -287,6 +298,7 @@ public class TestEntityStorage {
       store.put(table1);
       store.put(fileset1);
       store.put(topic1);
+      store.put(model1);
       store.put(user1);
       store.put(group1);
       store.put(role1);
@@ -325,6 +337,12 @@ public class TestEntityStorage {
                   NameIdentifier.of("metalake", "catalog", "schema1", 
"topic1"),
                   Entity.EntityType.TOPIC,
                   TopicEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1"),
+                  Entity.EntityType.MODEL,
+                  ModelEntity.class));
 
       Assertions.assertDoesNotThrow(
           () ->
@@ -386,6 +404,13 @@ public class TestEntityStorage {
                   NameIdentifier.of("metalake", "catalog", "schema1", 
"topic1"),
                   Entity.EntityType.TOPIC,
                   TopicEntity.class));
+      Assertions.assertDoesNotThrow(
+          () ->
+              store.get(
+                  NameIdentifier.of("metalake", "catalog", "schema1", 
"model1"),
+                  Entity.EntityType.MODEL,
+                  ModelEntity.class));
+
       Assertions.assertDoesNotThrow(
           () ->
               store.get(
@@ -595,6 +620,15 @@ public class TestEntityStorage {
       TopicEntity topic1 =
           createTopicEntity(
               1L, Namespace.of("metalake", "catalog", "schema1"), "topic1", 
auditInfo);
+      ModelEntity model1 =
+          TestJDBCBackend.createModelEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              Namespace.of("metalake", "catalog", "schema1"),
+              "model1",
+              "model1",
+              1,
+              null,
+              auditInfo);
 
       SchemaEntity schema2 =
           createSchemaEntity(2L, Namespace.of("metalake", "catalog"), 
"schema2", auditInfo);
@@ -617,6 +651,16 @@ public class TestEntityStorage {
       TopicEntity topic1InSchema2 =
           createTopicEntity(
               2L, Namespace.of("metalake", "catalog", "schema2"), "topic1", 
auditInfo);
+      ModelEntity model1InSchema2 =
+          TestJDBCBackend.createModelEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              Namespace.of("metalake", "catalog", "schema2"),
+              "model1",
+              "model1",
+              1,
+              null,
+              auditInfo);
+
       UserEntity user1 = createUser(1L, "metalake", "user1", auditInfo);
       UserEntity user2 = createUser(2L, "metalake", "user2", auditInfo);
       GroupEntity group1 = createGroup(1L, "metalake", "group1", auditInfo);
@@ -636,6 +680,8 @@ public class TestEntityStorage {
       store.put(fileset1InSchema2);
       store.put(topic1);
       store.put(topic1InSchema2);
+      store.put(model1);
+      store.put(model1InSchema2);
       store.put(user1);
       store.put(user2);
       store.put(group1);
@@ -656,6 +702,8 @@ public class TestEntityStorage {
           fileset1InSchema2,
           topic1,
           topic1InSchema2,
+          model1,
+          model1InSchema2,
           user1,
           user2,
           group1,
@@ -675,7 +723,9 @@ public class TestEntityStorage {
 
       validateDeleteTopic(store, schema2, topic1, topic1InSchema2);
 
-      validateDeleteSchema(store, schema1, table1, fileset1, topic1);
+      validateDeleteModel(store, schema2, model1, model1InSchema2);
+
+      validateDeleteSchema(store, schema1, table1, fileset1, topic1, model1);
 
       validateDeleteCatalog(
           store,
@@ -687,7 +737,9 @@ public class TestEntityStorage {
           fileset1,
           fileset1InSchema2,
           topic1,
-          topic1InSchema2);
+          topic1InSchema2,
+          model1,
+          model1InSchema2);
 
       validateDeleteMetalake(store, metalake, catalogCopy, user2, group2, 
role2);
 
@@ -774,6 +826,19 @@ public class TestEntityStorage {
               topic1InSchema2.name(),
               topic1InSchema2.auditInfo());
       store.put(topic1InSchema2New);
+
+      // model
+      ModelEntity model1New =
+          TestJDBCBackend.createModelEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              model1.namespace(),
+              model1.name(),
+              model1.comment(),
+              model1.latestVersion(),
+              model1.properties(),
+              model1.auditInfo());
+      store.put(model1New);
+
       UserEntity userNew =
           createUser(RandomIdGenerator.INSTANCE.nextId(), "metalake", 
"userNew", auditInfo);
       store.put(userNew);
@@ -790,7 +855,9 @@ public class TestEntityStorage {
 
       validateDeleteTopicCascade(store, topic1New);
 
-      validateDeleteSchemaCascade(store, schema1New, table1New, fileset1New, 
topic1New);
+      validateDeleteModelCascade(store, model1New);
+
+      validateDeleteSchemaCascade(store, schema1New, table1New, fileset1New, 
topic1New, model1New);
 
       validateDeleteCatalogCascade(store, catalogNew, schema2New);
 
@@ -838,12 +905,23 @@ public class TestEntityStorage {
       TopicEntity topicEntity1 =
           createTopicEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, 
"sameName", auditInfo);
 
+      ModelEntity model1 =
+          TestJDBCBackend.createModelEntity(
+              RandomIdGenerator.INSTANCE.nextId(),
+              namespace,
+              "sameName",
+              "model1",
+              1,
+              null,
+              auditInfo);
+
       store.put(metalake1);
       store.put(catalog1);
       store.put(schema1);
       store.put(table1);
       store.put(filesetEntity1);
       store.put(topicEntity1);
+      store.put(model1);
 
       NameIdentifier identifier = NameIdentifier.of("metalake1", "catalog1", 
"schema1", "sameName");
 
@@ -856,12 +934,16 @@ public class TestEntityStorage {
       TopicEntity loadedTopicEntity =
           store.get(identifier, Entity.EntityType.TOPIC, TopicEntity.class);
       Assertions.assertEquals(topicEntity1.id(), loadedTopicEntity.id());
+      ModelEntity loadedModelEntity =
+          store.get(identifier, Entity.EntityType.MODEL, ModelEntity.class);
+      Assertions.assertEquals(model1.id(), loadedModelEntity.id());
 
       // Remove table will not affect another
       Assertions.assertTrue(store.delete(identifier, Entity.EntityType.TABLE));
       Assertions.assertNotNull(
           store.get(identifier, Entity.EntityType.FILESET, 
FilesetEntity.class));
       Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TOPIC, 
TopicEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       // JDBC use id as the primary key, so we need to change the id of table1 
if we want to store
       // it again
@@ -873,6 +955,7 @@ public class TestEntityStorage {
       store.delete(identifier, Entity.EntityType.FILESET);
       Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TABLE, 
TableEntity.class));
       Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TOPIC, 
TopicEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       filesetEntity1 =
           createFilesetEntity(
@@ -884,6 +967,7 @@ public class TestEntityStorage {
       Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TABLE, 
TableEntity.class));
       Assertions.assertNotNull(
           store.get(identifier, Entity.EntityType.FILESET, 
FilesetEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       topicEntity1 =
           createTopicEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, 
"sameName", auditInfo);
@@ -899,9 +983,12 @@ public class TestEntityStorage {
 
       NameIdentifier changedNameIdentifier =
           NameIdentifier.of("metalake1", "catalog1", "schema1", 
"sameNameChanged");
-      store.get(changedNameIdentifier, Entity.EntityType.TABLE, 
TableEntity.class);
-      store.get(identifier, Entity.EntityType.FILESET, FilesetEntity.class);
-      store.get(identifier, Entity.EntityType.TOPIC, TopicEntity.class);
+      Assertions.assertNotNull(
+          store.get(changedNameIdentifier, Entity.EntityType.TABLE, 
TableEntity.class));
+      Assertions.assertNotNull(
+          store.get(identifier, Entity.EntityType.FILESET, 
FilesetEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TOPIC, 
TopicEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       table1 =
           createTableEntity(RandomIdGenerator.INSTANCE.nextId(), namespace, 
"sameName", auditInfo);
@@ -915,9 +1002,11 @@ public class TestEntityStorage {
           Entity.EntityType.FILESET,
           e -> createFilesetEntity(filesetId, namespace, "sameNameChanged", 
e.auditInfo()));
 
-      store.get(identifier, Entity.EntityType.TABLE, TableEntity.class);
-      store.get(changedNameIdentifier, Entity.EntityType.FILESET, 
FilesetEntity.class);
-      store.get(identifier, Entity.EntityType.TOPIC, TopicEntity.class);
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TABLE, 
TableEntity.class));
+      Assertions.assertNotNull(
+          store.get(changedNameIdentifier, Entity.EntityType.FILESET, 
FilesetEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TOPIC, 
TopicEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       filesetEntity1 =
           createFilesetEntity(
@@ -932,9 +1021,12 @@ public class TestEntityStorage {
           Entity.EntityType.TOPIC,
           e -> createTopicEntity(topicId, namespace, "sameNameChanged", 
e.auditInfo()));
 
-      store.get(identifier, Entity.EntityType.TABLE, TableEntity.class);
-      store.get(identifier, Entity.EntityType.FILESET, FilesetEntity.class);
-      store.get(changedNameIdentifier, Entity.EntityType.TOPIC, 
TopicEntity.class);
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.TABLE, 
TableEntity.class));
+      Assertions.assertNotNull(
+          store.get(identifier, Entity.EntityType.FILESET, 
FilesetEntity.class));
+      Assertions.assertNotNull(
+          store.get(changedNameIdentifier, Entity.EntityType.TOPIC, 
TopicEntity.class));
+      Assertions.assertNotNull(store.get(identifier, Entity.EntityType.MODEL, 
ModelEntity.class));
 
       destroy(type);
     }
@@ -1354,6 +1446,15 @@ public class TestEntityStorage {
     Assertions.assertFalse(store.delete(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
   }
 
+  private void validateDeleteModelCascade(EntityStore store, ModelEntity 
model1)
+      throws IOException {
+    // Delete the topic 'metalake.catalog.schema1.topic1'
+    Assertions.assertTrue(store.delete(model1.nameIdentifier(), 
EntityType.MODEL));
+    Assertions.assertFalse(store.exists(model1.nameIdentifier(), 
EntityType.MODEL));
+    // Delete again should return false
+    Assertions.assertFalse(store.delete(model1.nameIdentifier(), 
EntityType.MODEL));
+  }
+
   private void validateDeleteFilesetCascade(EntityStore store, FilesetEntity 
fileset1)
       throws IOException {
     // Delete the fileset 'metalake.catalog.schema1.fileset1'
@@ -1469,7 +1570,8 @@ public class TestEntityStorage {
       SchemaEntity schema1,
       TableEntity table1,
       FilesetEntity fileset1,
-      TopicEntity topic1)
+      TopicEntity topic1,
+      ModelEntity model1)
       throws IOException {
     TableEntity table1New =
         createTableEntityWithColumns(
@@ -1494,6 +1596,17 @@ public class TestEntityStorage {
             topic1.auditInfo());
     store.put(topic1New);
 
+    ModelEntity model1New =
+        TestJDBCBackend.createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            model1.namespace(),
+            model1.name(),
+            model1.comment(),
+            model1.latestVersion(),
+            model1.properties(),
+            model1.auditInfo());
+    store.put(model1New);
+
     Assertions.assertThrowsExactly(
         NonEmptyEntityException.class,
         () -> store.delete(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
@@ -1527,6 +1640,10 @@ public class TestEntityStorage {
     Assertions.assertThrows(
         NoSuchEntityException.class,
         () -> store.get(topic1.nameIdentifier(), Entity.EntityType.TOPIC, 
TopicEntity.class));
+
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
   }
 
   private void validateDeleteMetalake(
@@ -1567,7 +1684,9 @@ public class TestEntityStorage {
       FilesetEntity fileset1,
       FilesetEntity fileset1InSchema2,
       TopicEntity topic1,
-      TopicEntity topic1InSchema2)
+      TopicEntity topic1InSchema2,
+      ModelEntity model1,
+      ModelEntity model1InSchema2)
       throws IOException {
     // Now try to delete all schemas under catalog;
     Assertions.assertThrowsExactly(
@@ -1577,6 +1696,8 @@ public class TestEntityStorage {
     validateDeletedColumns(table1.id(), table1.type());
     store.delete(fileset1.nameIdentifier(), Entity.EntityType.FILESET);
     store.delete(topic1.nameIdentifier(), Entity.EntityType.TOPIC);
+    store.delete(model1.nameIdentifier(), Entity.EntityType.MODEL);
+
     try {
       Thread.sleep(1000);
     } catch (InterruptedException e) {
@@ -1588,6 +1709,7 @@ public class TestEntityStorage {
     Assertions.assertFalse(
         store.exists(fileset1InSchema2.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertFalse(store.exists(topic1InSchema2.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    Assertions.assertFalse(store.exists(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
     store.delete(schema2.nameIdentifier(), Entity.EntityType.SCHEMA);
 
     store.delete(catalog.nameIdentifier(), Entity.EntityType.CATALOG);
@@ -1601,7 +1723,8 @@ public class TestEntityStorage {
       SchemaEntity schema1,
       TableEntity table1,
       FilesetEntity fileset1,
-      TopicEntity topic1)
+      TopicEntity topic1,
+      ModelEntity model1)
       throws IOException {
     // Delete the schema 'metalake.catalog.schema1' but failed, because it ha 
sub-entities;
     NonEmptyEntityException exception =
@@ -1614,26 +1737,30 @@ public class TestEntityStorage {
     // has not been deleted yet;
     Assertions.assertTrue(store.exists(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
     Assertions.assertTrue(store.exists(table1.nameIdentifier(), 
Entity.EntityType.TABLE));
-    ;
+
     Assertions.assertTrue(store.exists(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertTrue(store.exists(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    Assertions.assertTrue(store.exists(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
 
     // Delete table1,fileset1 and schema1
     Assertions.assertTrue(store.delete(table1.nameIdentifier(), 
Entity.EntityType.TABLE));
     validateDeletedColumns(table1.id(), table1.type());
     Assertions.assertTrue(store.delete(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertTrue(store.delete(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    Assertions.assertTrue(store.delete(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
     Assertions.assertTrue(store.delete(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
     // Make sure table1, fileset1 in 'metalake.catalog.schema1' can't be 
access;
     Assertions.assertFalse(store.exists(table1.nameIdentifier(), 
Entity.EntityType.TABLE));
     Assertions.assertFalse(store.exists(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertFalse(store.exists(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    Assertions.assertFalse(store.exists(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
     Assertions.assertFalse(store.exists(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
 
     // Delete again should return false
     Assertions.assertFalse(store.delete(table1.nameIdentifier(), 
Entity.EntityType.TABLE));
     Assertions.assertFalse(store.delete(fileset1.nameIdentifier(), 
Entity.EntityType.FILESET));
     Assertions.assertFalse(store.delete(topic1.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    Assertions.assertFalse(store.delete(model1.nameIdentifier(), 
Entity.EntityType.MODEL));
     Assertions.assertFalse(store.delete(schema1.nameIdentifier(), 
Entity.EntityType.SCHEMA));
 
     // Now we re-insert schema1, table1, fileset1 and topic1, and everything 
should be OK
@@ -1666,6 +1793,17 @@ public class TestEntityStorage {
             topic1.auditInfo());
     store.put(topic1New);
 
+    ModelEntity model1New =
+        TestJDBCBackend.createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            model1.namespace(),
+            model1.name(),
+            model1.comment(),
+            model1.latestVersion(),
+            model1.properties(),
+            model1.auditInfo());
+    store.put(model1New);
+
     Assertions.assertEquals(
         schema1New,
         store.get(schema1.nameIdentifier(), Entity.EntityType.SCHEMA, 
SchemaEntity.class));
@@ -1676,6 +1814,8 @@ public class TestEntityStorage {
         store.get(fileset1.nameIdentifier(), Entity.EntityType.FILESET, 
FilesetEntity.class));
     Assertions.assertEquals(
         topic1New, store.get(topic1.nameIdentifier(), Entity.EntityType.TOPIC, 
TopicEntity.class));
+    Assertions.assertEquals(
+        model1New, store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
   }
 
   private void validateDeleteUser(EntityStore store, UserEntity user1) throws 
IOException {
@@ -1745,6 +1885,21 @@ public class TestEntityStorage {
     Assertions.assertTrue(store.exists(table1InSchema2.nameIdentifier(), 
Entity.EntityType.TABLE));
   }
 
+  private void validateDeleteModel(
+      EntityStore store, SchemaEntity schema2, ModelEntity model1, ModelEntity 
model1InSchema2)
+      throws IOException {
+    Assertions.assertTrue(store.delete(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
+    Assertions.assertFalse(store.exists(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
+    // delete again should return false
+    Assertions.assertFalse(store.delete(model1InSchema2.nameIdentifier(), 
Entity.EntityType.MODEL));
+
+    Assertions.assertEquals(
+        model1, store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
+    // Make sure schema 'metalake.catalog.schema2' still exist;
+    Assertions.assertEquals(
+        schema2, store.get(schema2.nameIdentifier(), Entity.EntityType.SCHEMA, 
SchemaEntity.class));
+  }
+
   private static void validateAllEntityExist(
       BaseMetalake metalake,
       EntityStore store,
@@ -1758,6 +1913,8 @@ public class TestEntityStorage {
       FilesetEntity fileset1InSchema2,
       TopicEntity topic1,
       TopicEntity topic1InSchema2,
+      ModelEntity model1,
+      ModelEntity model1InSchema2,
       UserEntity user1,
       UserEntity user2,
       GroupEntity group1,
@@ -1796,6 +1953,11 @@ public class TestEntityStorage {
     Assertions.assertEquals(
         topic1InSchema2,
         store.get(topic1InSchema2.nameIdentifier(), Entity.EntityType.TOPIC, 
TopicEntity.class));
+    Assertions.assertEquals(
+        model1, store.get(model1.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
+    Assertions.assertEquals(
+        model1InSchema2,
+        store.get(model1InSchema2.nameIdentifier(), Entity.EntityType.MODEL, 
ModelEntity.class));
     Assertions.assertEquals(
         user1, store.get(user1.nameIdentifier(), Entity.EntityType.USER, 
UserEntity.class));
     Assertions.assertEquals(
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
index 01b1cac96..739edba96 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackend.java
@@ -27,6 +27,7 @@ import static 
org.apache.gravitino.Configs.ENTITY_RELATIONAL_STORE;
 import static org.apache.gravitino.Configs.ENTITY_STORE;
 import static org.apache.gravitino.Configs.RELATIONAL_ENTITY_STORE;
 import static org.apache.gravitino.SupportsRelationOperations.Type.OWNER_REL;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -67,6 +68,7 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.FilesetEntity;
 import org.apache.gravitino.meta.GroupEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.RoleEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
@@ -297,6 +299,29 @@ public class TestJDBCBackend {
             auditInfo);
     backend.insert(topic, false);
     assertThrows(EntityAlreadyExistsException.class, () -> 
backend.insert(topicCopy, false));
+
+    ModelEntity model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofModel("metalake", "catalog", "schema"),
+            "model",
+            "model comment",
+            1,
+            ImmutableMap.of("key", "value"),
+            auditInfo);
+    ModelEntity modelCopy =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofModel("metalake", "catalog", "schema"),
+            "model",
+            "model comment",
+            1,
+            ImmutableMap.of("key", "value"),
+            auditInfo);
+
+    assertDoesNotThrow(() -> backend.insert(model, false));
+    assertThrows(EntityAlreadyExistsException.class, () -> 
backend.insert(modelCopy, false));
+    assertDoesNotThrow(() -> backend.insert(modelCopy, true));
   }
 
   @Test
@@ -575,6 +600,17 @@ public class TestJDBCBackend {
             auditInfo);
     backend.insert(topic, false);
 
+    ModelEntity model =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            NamespaceUtil.ofModel("metalake", "catalog", "schema"),
+            "model",
+            "model comment",
+            1,
+            ImmutableMap.of("key", "value"),
+            auditInfo);
+    backend.insert(model, false);
+
     // update fileset properties and version
     FilesetEntity filesetV2 =
         createFilesetEntity(
@@ -736,6 +772,9 @@ public class TestJDBCBackend {
     List<TopicEntity> topics = backend.list(topic.namespace(), 
Entity.EntityType.TOPIC, true);
     assertTrue(topics.contains(topic));
 
+    List<ModelEntity> models = backend.list(model.namespace(), 
Entity.EntityType.MODEL, true);
+    assertTrue(models.contains(model));
+
     RoleEntity roleEntity = backend.get(role.nameIdentifier(), 
Entity.EntityType.ROLE);
     assertEquals(role, roleEntity);
     assertEquals(1, 
RoleMetaService.getInstance().listRolesByUserId(user.id()).size());
@@ -830,6 +869,7 @@ public class TestJDBCBackend {
 
     assertFalse(backend.exists(table.nameIdentifier(), 
Entity.EntityType.TABLE));
     assertFalse(backend.exists(topic.nameIdentifier(), 
Entity.EntityType.TOPIC));
+    assertFalse(backend.exists(model.nameIdentifier(), 
Entity.EntityType.MODEL));
 
     assertFalse(backend.exists(role.nameIdentifier(), Entity.EntityType.ROLE));
     assertEquals(0, 
RoleMetaService.getInstance().listRolesByUserId(user.id()).size());
@@ -861,6 +901,7 @@ public class TestJDBCBackend {
     assertTrue(legacyRecordExistsInDB(schema.id(), Entity.EntityType.SCHEMA));
     assertTrue(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE));
     assertTrue(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC));
+    assertTrue(legacyRecordExistsInDB(model.id(), Entity.EntityType.MODEL));
     assertTrue(legacyRecordExistsInDB(fileset.id(), 
Entity.EntityType.FILESET));
     assertTrue(legacyRecordExistsInDB(role.id(), Entity.EntityType.ROLE));
     assertTrue(legacyRecordExistsInDB(user.id(), Entity.EntityType.USER));
@@ -883,6 +924,7 @@ public class TestJDBCBackend {
     assertFalse(legacyRecordExistsInDB(table.id(), Entity.EntityType.TABLE));
     assertFalse(legacyRecordExistsInDB(fileset.id(), 
Entity.EntityType.FILESET));
     assertFalse(legacyRecordExistsInDB(topic.id(), Entity.EntityType.TOPIC));
+    assertFalse(legacyRecordExistsInDB(model.id(), Entity.EntityType.MODEL));
     assertFalse(legacyRecordExistsInDB(role.id(), Entity.EntityType.ROLE));
     assertFalse(legacyRecordExistsInDB(user.id(), Entity.EntityType.USER));
     assertFalse(legacyRecordExistsInDB(group.id(), Entity.EntityType.GROUP));
@@ -937,6 +979,10 @@ public class TestJDBCBackend {
         tableName = "topic_meta";
         idColumnName = "topic_id";
         break;
+      case MODEL:
+        tableName = "model_meta";
+        idColumnName = "model_id";
+        break;
       case ROLE:
         tableName = "role_meta";
         idColumnName = "role_id";
@@ -1326,4 +1372,47 @@ public class TestJDBCBackend {
         .withSecurableObjects(securableObjects)
         .build();
   }
+
+  public static ModelEntity createModelEntity(
+      Long id,
+      Namespace namespace,
+      String name,
+      String comment,
+      Integer latestVersion,
+      Map<String, String> properties,
+      AuditInfo auditInfo) {
+    return ModelEntity.builder()
+        .withId(id)
+        .withName(name)
+        .withNamespace(namespace)
+        .withComment(comment)
+        .withLatestVersion(latestVersion)
+        .withProperties(properties)
+        .withAuditInfo(auditInfo)
+        .build();
+  }
+
+  protected void createParentEntities(
+      String metalakeName, String catalogName, String schemaName, AuditInfo 
auditInfo)
+      throws IOException {
+    BaseMetalake metalake =
+        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
+    backend.insert(metalake, false);
+
+    CatalogEntity catalog =
+        createCatalog(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName),
+            catalogName,
+            auditInfo);
+    backend.insert(catalog, false);
+
+    SchemaEntity schema =
+        createSchemaEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            Namespace.of(metalakeName, catalog.name()),
+            schemaName,
+            auditInfo);
+    backend.insert(schema, false);
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
new file mode 100644
index 000000000..067edd5a2
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestModelMetaService.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.EntityAlreadyExistsException;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.storage.relational.TestJDBCBackend;
+import org.apache.gravitino.storage.relational.po.ModelPO;
+import org.apache.gravitino.storage.relational.utils.POConverters;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestModelMetaService extends TestJDBCBackend {
+
+  private static final String METALAKE_NAME = "metalake_for_model_meta_test";
+
+  private static final String CATALOG_NAME = "catalog_for_model_meta_test";
+
+  private static final String SCHEMA_NAME = "schema_for_model_meta_test";
+
+  private static final Namespace MODEL_NS = Namespace.of(METALAKE_NAME, 
CATALOG_NAME, SCHEMA_NAME);
+
+  private final AuditInfo auditInfo =
+      
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+
+  @Test
+  public void testInsertAndSelectModel() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+    Map<String, String> properties = ImmutableMap.of("k1", "v1");
+
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    ModelEntity registeredModelEntity =
+        
ModelMetaService.getInstance().getModelByIdentifier(modelEntity.nameIdentifier());
+    Assertions.assertEquals(modelEntity, registeredModelEntity);
+
+    // Test insert again without overwrite
+    Assertions.assertThrows(
+        EntityAlreadyExistsException.class,
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    // Test insert again with overwrite
+    ModelEntity modelEntity2 =
+        createModelEntity(
+            modelEntity.id(),
+            modelEntity.namespace(),
+            "model2",
+            null,
+            modelEntity.latestVersion(),
+            null,
+            auditInfo);
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity2, true));
+    ModelEntity registeredModelEntity2 =
+        
ModelMetaService.getInstance().getModelByIdentifier(modelEntity2.nameIdentifier());
+    Assertions.assertEquals(modelEntity2, registeredModelEntity2);
+
+    // Test get an in-existent model
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelMetaService.getInstance()
+                .getModelByIdentifier(NameIdentifier.of(MODEL_NS, "model3")));
+
+    // Test get model by id
+    ModelPO modelPO = 
ModelMetaService.getInstance().getModelPOById(modelEntity.id());
+    Assertions.assertEquals(
+        modelEntity2, POConverters.fromModelPO(modelPO, 
modelEntity.namespace()));
+
+    // Test get in-existent model by id
+    Assertions.assertThrows(
+        NoSuchEntityException.class, () -> 
ModelMetaService.getInstance().getModelPOById(111L));
+
+    // Test get model id by name
+    Long schemaId = 
CommonMetaService.getInstance().getParentEntityIdByNamespace(MODEL_NS);
+    Long modelId =
+        
ModelMetaService.getInstance().getModelIdBySchemaIdAndModelName(schemaId, 
"model2");
+    Assertions.assertEquals(modelEntity2.id(), modelId);
+
+    // Test get in-existent model id by name
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
ModelMetaService.getInstance().getModelIdBySchemaIdAndModelName(schemaId, 
"model3"));
+  }
+
+  @Test
+  public void testInsertAndListModels() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+    Map<String, String> properties = ImmutableMap.of("k1", "v1");
+
+    ModelEntity modelEntity1 =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+    ModelEntity modelEntity2 =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model2",
+            "model2 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity1, false));
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity2, false));
+
+    List<ModelEntity> modelEntities =
+        ModelMetaService.getInstance().listModelsByNamespace(MODEL_NS);
+    Assertions.assertEquals(2, modelEntities.size());
+    Assertions.assertTrue(modelEntities.contains(modelEntity1));
+    Assertions.assertTrue(modelEntities.contains(modelEntity2));
+
+    // Test list models by in-existent namespace
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () ->
+            ModelMetaService.getInstance()
+                .listModelsByNamespace(Namespace.of(METALAKE_NAME, 
CATALOG_NAME, "inexistent")));
+  }
+
+  @Test
+  public void testInsertAndDeleteModel() throws IOException {
+    createParentEntities(METALAKE_NAME, CATALOG_NAME, SCHEMA_NAME, auditInfo);
+    Map<String, String> properties = ImmutableMap.of("k1", "v1");
+
+    ModelEntity modelEntity =
+        createModelEntity(
+            RandomIdGenerator.INSTANCE.nextId(),
+            MODEL_NS,
+            "model1",
+            "model1 comment",
+            0,
+            properties,
+            auditInfo);
+
+    Assertions.assertDoesNotThrow(
+        () -> ModelMetaService.getInstance().insertModel(modelEntity, false));
+
+    
Assertions.assertTrue(ModelMetaService.getInstance().deleteModel(modelEntity.nameIdentifier()));
+    Assertions.assertThrows(
+        NoSuchEntityException.class,
+        () -> 
ModelMetaService.getInstance().getModelByIdentifier(modelEntity.nameIdentifier()));
+
+    // Delete again should return false
+    Assertions.assertFalse(
+        
ModelMetaService.getInstance().deleteModel(modelEntity.nameIdentifier()));
+
+    // Test delete in-existent model
+    Assertions.assertFalse(
+        ModelMetaService.getInstance().deleteModel(NameIdentifier.of(MODEL_NS, 
"inexistent")));
+
+    // Test delete in-existent schema
+    Assertions.assertFalse(
+        ModelMetaService.getInstance()
+            .deleteModel(NameIdentifier.of(METALAKE_NAME, CATALOG_NAME, 
"inexistent", "model1")));
+  }
+}
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
index 30eb6bda6..a0e9f3427 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestTableColumnMetaService.java
@@ -28,10 +28,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.meta.AuditInfo;
-import org.apache.gravitino.meta.BaseMetalake;
-import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
-import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.types.Types;
@@ -53,7 +50,7 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
   public void testInsertAndGetTableColumns() throws IOException {
     String catalogName = "catalog1";
     String schemaName = "schema1";
-    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+    createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
 
     // Create a table entity without columns
     TableEntity createdTable =
@@ -154,7 +151,7 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
   public void testUpdateTable() throws IOException {
     String catalogName = "catalog1";
     String schemaName = "schema1";
-    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+    createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
 
     // Create a table entity without columns
     TableEntity createdTable =
@@ -331,7 +328,7 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
   public void testCreateAndDeleteTable() throws IOException {
     String catalogName = "catalog1";
     String schemaName = "schema1";
-    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+    createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
 
     // Create a table entity with column
     ColumnEntity column =
@@ -378,7 +375,7 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
   public void testDeleteMetalake() throws IOException {
     String catalogName = "catalog1";
     String schemaName = "schema1";
-    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+    createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
 
     // Create a table entity with column
     ColumnEntity column =
@@ -425,7 +422,7 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
   public void testGetColumnIdAndPO() throws IOException {
     String catalogName = "catalog1";
     String schemaName = "schema1";
-    createParentEntities(METALAKE_NAME, catalogName, schemaName);
+    createParentEntities(METALAKE_NAME, catalogName, schemaName, auditInfo);
 
     // Create a table entity with column
     ColumnEntity column =
@@ -550,27 +547,4 @@ public class TestTableColumnMetaService extends 
TestJDBCBackend {
           Assertions.assertEquals(expectedColumn.auditInfo(), 
column.auditInfo());
         });
   }
-
-  private void createParentEntities(String metalakeName, String catalogName, 
String schemaName)
-      throws IOException {
-    BaseMetalake metalake =
-        createBaseMakeLake(RandomIdGenerator.INSTANCE.nextId(), metalakeName, 
auditInfo);
-    backend.insert(metalake, false);
-
-    CatalogEntity catalog =
-        createCatalog(
-            RandomIdGenerator.INSTANCE.nextId(),
-            Namespace.of(metalakeName),
-            catalogName,
-            auditInfo);
-    backend.insert(catalog, false);
-
-    SchemaEntity schema =
-        createSchemaEntity(
-            RandomIdGenerator.INSTANCE.nextId(),
-            Namespace.of(metalakeName, catalog.name()),
-            schemaName,
-            auditInfo);
-    backend.insert(schema, false);
-  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
index 76a2c35d3..703b79e70 100644
--- 
a/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
+++ 
b/core/src/test/java/org/apache/gravitino/storage/relational/utils/TestPOConverters.java
@@ -46,6 +46,7 @@ import org.apache.gravitino.meta.BaseMetalake;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.ColumnEntity;
 import org.apache.gravitino.meta.FilesetEntity;
+import org.apache.gravitino.meta.ModelEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.meta.TableEntity;
@@ -60,6 +61,7 @@ import org.apache.gravitino.storage.relational.po.ColumnPO;
 import org.apache.gravitino.storage.relational.po.FilesetPO;
 import org.apache.gravitino.storage.relational.po.FilesetVersionPO;
 import org.apache.gravitino.storage.relational.po.MetalakePO;
+import org.apache.gravitino.storage.relational.po.ModelPO;
 import org.apache.gravitino.storage.relational.po.OwnerRelPO;
 import org.apache.gravitino.storage.relational.po.SchemaPO;
 import org.apache.gravitino.storage.relational.po.TablePO;
@@ -832,6 +834,160 @@ public class TestPOConverters {
     assertEquals(0, ownerRelPO.getDeletedAt());
   }
 
+  @Test
+  public void testInitModelPO() throws JsonProcessingException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+    ModelEntity modelEntity =
+        ModelEntity.builder()
+            .withId(1L)
+            .withName("test")
+            .withNamespace(Namespace.of("test_metalake", "test_catalog", 
"test_schema"))
+            .withComment("this is test")
+            .withProperties(ImmutableMap.of("key", "value"))
+            .withLatestVersion(1)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelPO.Builder builder =
+        
ModelPO.builder().withMetalakeId(1L).withCatalogId(1L).withSchemaId(1L);
+    ModelPO modelPO = POConverters.initializeModelPO(modelEntity, builder);
+
+    assertEquals(1, modelPO.getModelId());
+    assertEquals("test", modelPO.getModelName());
+    assertEquals(1, modelPO.getMetalakeId());
+    assertEquals(1, modelPO.getCatalogId());
+    assertEquals(1, modelPO.getSchemaId());
+    assertEquals("this is test", modelPO.getModelComment());
+
+    Map<String, String> resultProperties =
+        JsonUtils.anyFieldMapper().readValue(modelPO.getModelProperties(), 
Map.class);
+    assertEquals(ImmutableMap.of("key", "value"), resultProperties);
+
+    AuditInfo resultAuditInfo =
+        JsonUtils.anyFieldMapper().readValue(modelPO.getAuditInfo(), 
AuditInfo.class);
+    assertEquals(auditInfo, resultAuditInfo);
+    assertEquals(1, modelPO.getModelLatestVersion());
+    assertEquals(0, modelPO.getDeletedAt());
+
+    // Test with null fields
+    ModelEntity modelEntityWithNull =
+        ModelEntity.builder()
+            .withId(1L)
+            .withName("test")
+            .withNamespace(Namespace.of("test_metalake", "test_catalog", 
"test_schema"))
+            .withComment(null)
+            .withProperties(null)
+            .withLatestVersion(1)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelPO.Builder builderWithNull =
+        
ModelPO.builder().withMetalakeId(1L).withCatalogId(1L).withSchemaId(1L);
+    ModelPO modelPOWithNull = 
POConverters.initializeModelPO(modelEntityWithNull, builderWithNull);
+
+    assertNull(modelPOWithNull.getModelComment());
+    Map<String, String> resultPropertiesWithNull =
+        
JsonUtils.anyFieldMapper().readValue(modelPOWithNull.getModelProperties(), 
Map.class);
+    assertNull(resultPropertiesWithNull);
+  }
+
+  @Test
+  public void testFromModelPO() throws JsonProcessingException {
+    AuditInfo auditInfo =
+        
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
+    Map<String, String> properties = ImmutableMap.of("key", "value");
+    Map<String, String> emptyProperties = Collections.emptyMap();
+    Namespace namespace = Namespace.of("test_metalake", "test_catalog", 
"test_schema");
+
+    ModelPO modelPO =
+        ModelPO.builder()
+            .withModelId(1L)
+            .withModelName("test")
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L)
+            .withModelComment("this is test")
+            
.withModelProperties(JsonUtils.anyFieldMapper().writeValueAsString(properties))
+            
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+            .withModelLatestVersion(1)
+            .withDeletedAt(0L)
+            .build();
+
+    ModelEntity expectedModel =
+        ModelEntity.builder()
+            .withId(1L)
+            .withName("test")
+            .withNamespace(namespace)
+            .withComment("this is test")
+            .withProperties(properties)
+            .withLatestVersion(1)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelEntity convertedModel = POConverters.fromModelPO(modelPO, namespace);
+    assertEquals(expectedModel, convertedModel);
+
+    // test null fields
+    ModelPO modelPOWithNull =
+        ModelPO.builder()
+            .withModelId(1L)
+            .withModelName("test")
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L)
+            .withModelComment(null)
+            
.withModelProperties(JsonUtils.anyFieldMapper().writeValueAsString(null))
+            
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+            .withModelLatestVersion(1)
+            .withDeletedAt(0L)
+            .build();
+
+    ModelEntity expectedModelWithNull =
+        ModelEntity.builder()
+            .withId(1L)
+            .withName("test")
+            .withNamespace(namespace)
+            .withComment(null)
+            .withProperties(null)
+            .withLatestVersion(1)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelEntity convertedModelWithNull = 
POConverters.fromModelPO(modelPOWithNull, namespace);
+    assertEquals(expectedModelWithNull, convertedModelWithNull);
+
+    // Test with empty properties
+    ModelPO modelPOWithEmptyProperties =
+        ModelPO.builder()
+            .withModelId(1L)
+            .withModelName("test")
+            .withMetalakeId(1L)
+            .withCatalogId(1L)
+            .withSchemaId(1L)
+            .withModelComment("this is test")
+            
.withModelProperties(JsonUtils.anyFieldMapper().writeValueAsString(emptyProperties))
+            
.withAuditInfo(JsonUtils.anyFieldMapper().writeValueAsString(auditInfo))
+            .withModelLatestVersion(1)
+            .withDeletedAt(0L)
+            .build();
+
+    ModelEntity expectedModelWithEmptyProperties =
+        ModelEntity.builder()
+            .withId(1L)
+            .withName("test")
+            .withNamespace(namespace)
+            .withComment("this is test")
+            .withProperties(emptyProperties)
+            .withLatestVersion(1)
+            .withAuditInfo(auditInfo)
+            .build();
+
+    ModelEntity convertedModelWithEmptyProperties =
+        POConverters.fromModelPO(modelPOWithEmptyProperties, namespace);
+    assertEquals(expectedModelWithEmptyProperties, 
convertedModelWithEmptyProperties);
+  }
+
   private static BaseMetalake createMetalake(Long id, String name, String 
comment) {
     AuditInfo auditInfo =
         
AuditInfo.builder().withCreator("creator").withCreateTime(FIX_INSTANT).build();
diff --git a/scripts/h2/schema-0.8.0-h2.sql b/scripts/h2/schema-0.8.0-h2.sql
index 495d46f80..541c60da9 100644
--- a/scripts/h2/schema-0.8.0-h2.sql
+++ b/scripts/h2/schema-0.8.0-h2.sql
@@ -289,3 +289,50 @@ CREATE TABLE IF NOT EXISTS `owner_meta` (
     KEY `idx_oid` (`owner_id`),
     KEY `idx_meid` (`metadata_object_id`)
     ) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `model_meta` (
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_name` VARCHAR(128) NOT NULL COMMENT 'model name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_comment` TEXT DEFAULT NULL COMMENT 'model comment',
+    `model_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model properties',
+    `model_latest_version` INT UNSIGNED DEFAULT 0 COMMENT 'model latest 
version',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model deleted 
at',
+    PRIMARY KEY (`model_id`),
+    UNIQUE KEY `uk_sid_mn_del` (`schema_id`, `model_name`, `deleted_at`),
+    KEY `idx_mmid` (`metalake_id`),
+    KEY `idx_mcid` (`catalog_id`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `model_version_info` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_comment` TEXT DEFAULT NULL COMMENT 'model version comment',
+    `model_version_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model version 
properties',
+    `model_version_uri` TEXT NOT NULL COMMENT 'model storage uri',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model version audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mid_ver_del` (`model_id`, `version`, `deleted_at`),
+    KEY `idx_vmid` (`metalake_id`),
+    KEY `idx_vcid` (`catalog_id`),
+    KEY `idx_vsid` (`schema_id`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `model_version_alias_rel` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
alias deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, 
`model_version_alias`, `deleted_at`),
+    KEY `idx_mva` (`model_version_alias`)
+) ENGINE=InnoDB;
diff --git a/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql 
b/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql
index 89de7de2e..60c89a86e 100644
--- a/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql
+++ b/scripts/h2/upgrade-0.7.0-to-0.8.0-h2.sql
@@ -18,3 +18,50 @@
 --
 ALTER TABLE `role_meta_securable_object` ALTER COLUMN `privilege_names` 
TEXT(81920);
 ALTER TABLE `role_meta_securable_object` ALTER COLUMN `privilege_conditions` 
TEXT(81920);
+
+CREATE TABLE IF NOT EXISTS `model_meta` (
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_name` VARCHAR(128) NOT NULL COMMENT 'model name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_comment` TEXT DEFAULT NULL COMMENT 'model comment',
+    `model_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model properties',
+    `model_latest_version` INT UNSIGNED DEFAULT 0 COMMENT 'model latest 
version',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model deleted 
at',
+    PRIMARY KEY (`model_id`),
+    UNIQUE KEY `uk_sid_mn_del` (`schema_id`, `model_name`, `deleted_at`),
+    KEY `idx_mmid` (`metalake_id`),
+    KEY `idx_mcid` (`catalog_id`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `model_version_info` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_comment` TEXT DEFAULT NULL COMMENT 'model version comment',
+    `model_version_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model version 
properties',
+    `model_version_uri` TEXT NOT NULL COMMENT 'model storage uri',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model version audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mid_ver_del` (`model_id`, `version`, `deleted_at`),
+    KEY `idx_vmid` (`metalake_id`),
+    KEY `idx_vcid` (`catalog_id`),
+    KEY `idx_vsid` (`schema_id`)
+) ENGINE=InnoDB;
+
+CREATE TABLE IF NOT EXISTS `model_version_alias_rel` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
alias deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, 
`model_version_alias`, `deleted_at`),
+    KEY `idx_mva` (`model_version_alias`)
+) ENGINE=InnoDB;
diff --git a/scripts/mysql/schema-0.8.0-mysql.sql 
b/scripts/mysql/schema-0.8.0-mysql.sql
index 2c8a46305..07b8e146c 100644
--- a/scripts/mysql/schema-0.8.0-mysql.sql
+++ b/scripts/mysql/schema-0.8.0-mysql.sql
@@ -280,3 +280,50 @@ CREATE TABLE IF NOT EXISTS `owner_meta` (
     KEY `idx_oid` (`owner_id`),
     KEY `idx_meid` (`metadata_object_id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'owner 
relation';
+
+CREATE TABLE IF NOT EXISTS `model_meta` (
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_name` VARCHAR(128) NOT NULL COMMENT 'model name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_comment` TEXT DEFAULT NULL COMMENT 'model comment',
+    `model_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model properties',
+    `model_latest_version` INT UNSIGNED DEFAULT 0 COMMENT 'model latest 
version',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model deleted 
at',
+    PRIMARY KEY (`model_id`),
+    UNIQUE KEY `uk_sid_mn_del` (`schema_id`, `model_name`, `deleted_at`),
+    KEY `idx_mid` (`metalake_id`),
+    KEY `idx_cid` (`catalog_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model 
metadata';
+
+CREATE TABLE IF NOT EXISTS `model_version_info` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_comment` TEXT DEFAULT NULL COMMENT 'model version comment',
+    `model_version_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model version 
properties',
+    `model_version_uri` TEXT NOT NULL COMMENT 'model storage uri',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model version audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mid_ver_del` (`model_id`, `version`, `deleted_at`),
+    KEY `idx_mid` (`metalake_id`),
+    KEY `idx_cid` (`catalog_id`),
+    KEY `idx_sid` (`schema_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model 
version info';
+
+CREATE TABLE IF NOT EXISTS `model_version_alias_rel` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
alias deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, 
`model_version_alias`, `deleted_at`),
+    KEY `idx_mva` (`model_version_alias`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 
'model_version_alias_rel';
diff --git a/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql 
b/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql
index fb591ae56..7858237c7 100644
--- a/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql
+++ b/scripts/mysql/upgrade-0.7.0-to-0.8.0-mysql.sql
@@ -18,3 +18,50 @@
 --
 ALTER TABLE `role_meta_securable_object` MODIFY COLUMN `privilege_names` 
TEXT(81920);
 ALTER TABLE `role_meta_securable_object` MODIFY COLUMN `privilege_conditions` 
TEXT(81920);
+
+CREATE TABLE IF NOT EXISTS `model_meta` (
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_name` VARCHAR(128) NOT NULL COMMENT 'model name',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_comment` TEXT DEFAULT NULL COMMENT 'model comment',
+    `model_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model properties',
+    `model_latest_version` INT UNSIGNED DEFAULT 0 COMMENT 'model latest 
version',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model deleted 
at',
+    PRIMARY KEY (`model_id`),
+    UNIQUE KEY `uk_sid_mn_del` (`schema_id`, `model_name`, `deleted_at`),
+    KEY `idx_mid` (`metalake_id`),
+    KEY `idx_cid` (`catalog_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model 
metadata';
+
+CREATE TABLE IF NOT EXISTS `model_version_info` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `metalake_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'metalake id',
+    `catalog_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'catalog id',
+    `schema_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'schema id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_comment` TEXT DEFAULT NULL COMMENT 'model version comment',
+    `model_version_properties` MEDIUMTEXT DEFAULT NULL COMMENT 'model version 
properties',
+    `model_version_uri` TEXT NOT NULL COMMENT 'model storage uri',
+    `audit_info` MEDIUMTEXT NOT NULL COMMENT 'model version audit info',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mid_ver_del` (`model_id`, `version`, `deleted_at`),
+    KEY `idx_mid` (`metalake_id`),
+    KEY `idx_cid` (`catalog_id`),
+    KEY `idx_sid` (`schema_id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 'model 
version info';
+
+CREATE TABLE IF NOT EXISTS `model_version_alias_rel` (
+    `id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT 'auto increment 
id',
+    `model_id` BIGINT(20) UNSIGNED NOT NULL COMMENT 'model id',
+    `model_version` INT UNSIGNED NOT NULL COMMENT 'model version',
+    `model_version_alias` VARCHAR(128) NOT NULL COMMENT 'model version alias',
+    `deleted_at` BIGINT(20) UNSIGNED NOT NULL DEFAULT 0 COMMENT 'model version 
alias deleted at',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `uk_mi_mv_mva_del` (`model_id`, `model_version`, 
`model_version_alias`, `deleted_at`),
+    KEY `idx_mva` (`model_version_alias`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT 
'model_version_alias_rel';
diff --git a/scripts/postgresql/schema-0.8.0-postgresql.sql 
b/scripts/postgresql/schema-0.8.0-postgresql.sql
index 0f483a20f..fea045843 100644
--- a/scripts/postgresql/schema-0.8.0-postgresql.sql
+++ b/scripts/postgresql/schema-0.8.0-postgresql.sql
@@ -500,3 +500,87 @@ COMMENT ON COLUMN owner_meta.current_version IS 'owner 
relation current version'
 COMMENT ON COLUMN owner_meta.last_version IS 'owner relation last version';
 COMMENT ON COLUMN owner_meta.deleted_at IS 'owner relation deleted at';
 
+
+CREATE TABLE IF NOT EXISTS model_meta (
+    model_id BIGINT NOT NULL,
+    model_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    model_comment VARCHAR(65535) DEFAULT NULL,
+    model_properties TEXT DEFAULT NULL,
+    model_latest_version INT NOT NULL DEFAULT 0,
+    audit_info TEXT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (model_id),
+    UNIQUE (schema_id, model_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON model_meta (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON model_meta (catalog_id);
+COMMENT ON TABLE model_meta IS 'model metadata';
+
+COMMENT ON COLUMN model_meta.model_id IS 'model id';
+COMMENT ON COLUMN model_meta.model_name IS 'model name';
+COMMENT ON COLUMN model_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN model_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN model_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN model_meta.model_comment IS 'model comment';
+COMMENT ON COLUMN model_meta.model_properties IS 'model properties';
+COMMENT ON COLUMN model_meta.model_latest_version IS 'model max version';
+COMMENT ON COLUMN model_meta.audit_info IS 'model audit info';
+COMMENT ON COLUMN model_meta.deleted_at IS 'model deleted at';
+
+
+CREATE TABLE IF NOT EXISTS model_version_info (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    model_id BIGINT NOT NULL,
+    version INT NOT NULL,
+    model_version_comment VARCHAR(65535) DEFAULT NULL,
+    model_version_properties TEXT DEFAULT NULL,
+    model_version_uri TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (model_id, version, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON model_version_info (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON model_version_info (catalog_id);
+CREATE INDEX IF NOT EXISTS idx_schema_id ON model_version_info (schema_id);
+COMMENT ON TABLE model_version_info IS 'model version information';
+
+COMMENT ON COLUMN model_version_info.id IS 'auto increment id';
+COMMENT ON COLUMN model_version_info.metalake_id IS 'metalake id';
+COMMENT ON COLUMN model_version_info.catalog_id IS 'catalog id';
+COMMENT ON COLUMN model_version_info.schema_id IS 'schema id';
+COMMENT ON COLUMN model_version_info.model_id IS 'model id';
+COMMENT ON COLUMN model_version_info.version IS 'model version';
+COMMENT ON COLUMN model_version_info.model_version_comment IS 'model version 
comment';
+COMMENT ON COLUMN model_version_info.model_version_properties IS 'model 
version properties';
+COMMENT ON COLUMN model_version_info.model_version_uri IS 'model storage uri';
+COMMENT ON COLUMN model_version_info.audit_info IS 'model version audit info';
+COMMENT ON COLUMN model_version_info.deleted_at IS 'model version deleted at';
+
+
+CREATE TABLE IF NOT EXISTS model_version_alias_rel (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    model_id BIGINT NOT NULL,
+    model_version INT NOT NULL,
+    model_version_alias VARCHAR(128) NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (model_id, model_version, model_version_alias, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_model_version_alias on model_version_alias_rel 
(model_version_alias);
+COMMENT ON TABLE model_version_alias_rel IS 'model version alias relation';
+
+COMMENT ON COLUMN model_version_alias_rel.id IS 'auto increment id';
+COMMENT ON COLUMN model_version_alias_rel.model_id IS 'model id';
+COMMENT ON COLUMN model_version_alias_rel.model_version IS 'model version';
+COMMENT ON COLUMN model_version_alias_rel.model_version_alias IS 'model 
version alias';
+COMMENT ON COLUMN model_version_alias_rel.deleted_at IS 'model version alias 
deleted at';
diff --git a/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql 
b/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql
index 574351ec4..a94c4ab22 100644
--- a/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql
+++ b/scripts/postgresql/upgrade-0.7.0-to-0.8.0-postgresql.sql
@@ -18,3 +18,87 @@
 --
 ALTER TABLE role_meta_securable_object ALTER COLUMN privilege_names TYPE 
VARCHAR(81920);
 ALTER TABLE role_meta_securable_object ALTER COLUMN privilege_conditions TYPE 
VARCHAR(81920);
+
+CREATE TABLE IF NOT EXISTS model_meta (
+    model_id BIGINT NOT NULL,
+    model_name VARCHAR(128) NOT NULL,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    model_comment VARCHAR(65535) DEFAULT NULL,
+    model_properties TEXT DEFAULT NULL,
+    model_latest_version INT NOT NULL DEFAULT 0,
+    audit_info TEXT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (model_id),
+    UNIQUE (schema_id, model_name, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON model_meta (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON model_meta (catalog_id);
+COMMENT ON TABLE model_meta IS 'model metadata';
+
+COMMENT ON COLUMN model_meta.model_id IS 'model id';
+COMMENT ON COLUMN model_meta.model_name IS 'model name';
+COMMENT ON COLUMN model_meta.metalake_id IS 'metalake id';
+COMMENT ON COLUMN model_meta.catalog_id IS 'catalog id';
+COMMENT ON COLUMN model_meta.schema_id IS 'schema id';
+COMMENT ON COLUMN model_meta.model_comment IS 'model comment';
+COMMENT ON COLUMN model_meta.model_properties IS 'model properties';
+COMMENT ON COLUMN model_meta.model_latest_version IS 'model max version';
+COMMENT ON COLUMN model_meta.audit_info IS 'model audit info';
+COMMENT ON COLUMN model_meta.deleted_at IS 'model deleted at';
+
+
+CREATE TABLE IF NOT EXISTS model_version_info (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    metalake_id BIGINT NOT NULL,
+    catalog_id BIGINT NOT NULL,
+    schema_id BIGINT NOT NULL,
+    model_id BIGINT NOT NULL,
+    version INT NOT NULL,
+    model_version_comment VARCHAR(65535) DEFAULT NULL,
+    model_version_properties TEXT DEFAULT NULL,
+    model_version_uri TEXT NOT NULL,
+    audit_info TEXT NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (model_id, version, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_metalake_id ON model_version_info (metalake_id);
+CREATE INDEX IF NOT EXISTS idx_catalog_id ON model_version_info (catalog_id);
+CREATE INDEX IF NOT EXISTS idx_schema_id ON model_version_info (schema_id);
+COMMENT ON TABLE model_version_info IS 'model version information';
+
+COMMENT ON COLUMN model_version_info.id IS 'auto increment id';
+COMMENT ON COLUMN model_version_info.metalake_id IS 'metalake id';
+COMMENT ON COLUMN model_version_info.catalog_id IS 'catalog id';
+COMMENT ON COLUMN model_version_info.schema_id IS 'schema id';
+COMMENT ON COLUMN model_version_info.model_id IS 'model id';
+COMMENT ON COLUMN model_version_info.version IS 'model version';
+COMMENT ON COLUMN model_version_info.model_version_comment IS 'model version 
comment';
+COMMENT ON COLUMN model_version_info.model_version_properties IS 'model 
version properties';
+COMMENT ON COLUMN model_version_info.model_version_uri IS 'model storage uri';
+COMMENT ON COLUMN model_version_info.audit_info IS 'model version audit info';
+COMMENT ON COLUMN model_version_info.deleted_at IS 'model version deleted at';
+
+
+CREATE TABLE IF NOT EXISTS model_version_alias_rel (
+    id BIGINT NOT NULL GENERATED BY DEFAULT AS IDENTITY,
+    model_id BIGINT NOT NULL,
+    model_version INT NOT NULL,
+    model_version_alias VARCHAR(128) NOT NULL,
+    deleted_at BIGINT NOT NULL DEFAULT 0,
+    PRIMARY KEY (id),
+    UNIQUE (model_id, model_version, model_version_alias, deleted_at)
+    );
+
+CREATE INDEX IF NOT EXISTS idx_model_version_alias on model_version_alias_rel 
(model_version_alias);
+COMMENT ON TABLE model_version_alias_rel IS 'model version alias relation';
+
+COMMENT ON COLUMN model_version_alias_rel.id IS 'auto increment id';
+COMMENT ON COLUMN model_version_alias_rel.model_id IS 'model id';
+COMMENT ON COLUMN model_version_alias_rel.model_version IS 'model version';
+COMMENT ON COLUMN model_version_alias_rel.model_version_alias IS 'model 
version alias';
+COMMENT ON COLUMN model_version_alias_rel.deleted_at IS 'model version alias 
deleted at';

Reply via email to