This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new e8eeaca93 [#3064] feat(jdbc-doris): Supports partition management in 
Doris catalog (#3961)
e8eeaca93 is described below

commit e8eeaca93dd4b5a8e404e73807446c5ca422001a
Author: XiaoZ <[email protected]>
AuthorDate: Wed Jul 24 14:19:27 2024 +0800

    [#3064] feat(jdbc-doris): Supports partition management in Doris catalog 
(#3961)
    
    ### What changes were proposed in this pull request?
    
    Supports partition management in Doris catalog.
    
    ### Why are the changes needed?
    
    Fix: #3064
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT & IT.
    
    ---------
    
    Co-authored-by: zhanghan18 <[email protected]>
---
 .../rel/expressions/literals/Literals.java         |   2 +-
 .../catalog/jdbc/JdbcCatalogOperations.java        |   4 +
 .../apache/gravitino/catalog/jdbc/JdbcTable.java   |  52 ++-
 .../jdbc/operation/JdbcTableOperations.java        |   5 +-
 .../operation/JdbcTablePartitionOperations.java    |  52 +++
 .../catalog/jdbc/operation/TableOperation.java     |   4 +
 .../DorisTablePartitionPropertiesMetadata.java     |  64 +++
 .../doris/converter/DorisExceptionConverter.java   |  27 ++
 .../doris/operation/DorisTableOperations.java      |   7 +
 .../operation/DorisTablePartitionOperations.java   | 334 +++++++++++++++
 .../doris/integration/test/CatalogDorisIT.java     | 457 +++++++++++++++------
 .../catalog/doris/operation/TestDoris.java         |   7 +-
 .../TestDorisTablePartitionOperations.java         | 360 ++++++++++++++++
 .../org/apache/gravitino/connector/BaseTable.java  |   7 +-
 14 files changed, 1228 insertions(+), 154 deletions(-)

diff --git 
a/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java 
b/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
index f9f6eb677..f789afbb8 100644
--- 
a/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
+++ 
b/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
@@ -39,7 +39,7 @@ public class Literals {
    * @param value the literal value
    * @param dataType the data type of the literal
    * @param <T> the JVM type of value held by the literal
-   * @return a new {@link org.apache.gravitino.rel.expressions.Literal} 
instance
+   * @return a new {@link Literal} instance
    */
   public static <T> LiteralImpl<T> of(T value, Type dataType) {
     return new LiteralImpl<>(value, dataType);
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
index 8961df99e..4d2747a66 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
@@ -333,6 +333,8 @@ public class JdbcCatalogOperations implements 
CatalogOperations, SupportsSchemas
         .withProperties(properties)
         .withIndexes(load.index())
         .withPartitioning(load.partitioning())
+        .withDatabaseName(databaseName)
+        .withTableOperation(tableOperation)
         .build();
   }
 
@@ -450,6 +452,8 @@ public class JdbcCatalogOperations implements 
CatalogOperations, SupportsSchemas
         
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
         .withPartitioning(partitioning)
         .withIndexes(indexes)
+        .withDatabaseName(databaseName)
+        .withTableOperation(tableOperation)
         .build();
   }
 
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
index 6a5fc70e8..54a221866 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
@@ -20,28 +20,69 @@ package org.apache.gravitino.catalog.jdbc;
 
 import com.google.common.collect.Maps;
 import java.util.Map;
-import lombok.Getter;
 import lombok.ToString;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.catalog.jdbc.operation.TableOperation;
 import org.apache.gravitino.connector.BaseTable;
 import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.SupportsPartitions;
 
 /** Represents a Jdbc Table entity in the jdbc table. */
 @ToString
-@Getter
 public class JdbcTable extends BaseTable {
+  private String databaseName;
+  private TableOperation tableOperation;
 
   private JdbcTable() {}
 
   @Override
   protected TableOperations newOps() {
-    // TODO: Implement this method when we have the JDBC table operations.
-    throw new UnsupportedOperationException("JdbcTable does not support 
TableOperations.");
+    if (ArrayUtils.isEmpty(partitioning)) {
+      throw new UnsupportedOperationException(
+          "Table partition operation is not supported for non-partitioned 
table: " + name);
+    }
+    return tableOperation.createJdbcTablePartitionOperations(this);
+  }
+
+  @Override
+  public SupportsPartitions supportPartitions() throws 
UnsupportedOperationException {
+    return (SupportsPartitions) ops();
+  }
+
+  public String databaseName() {
+    return databaseName;
   }
 
   /** A builder class for constructing JdbcTable instances. */
   public static class Builder extends BaseTableBuilder<Builder, JdbcTable> {
+    private String databaseName;
+    private TableOperation tableOperation;
+
+    /**
+     * Sets the name of database.
+     *
+     * @param databaseName The name of database.
+     * @return This Builder instance.
+     */
+    public Builder withDatabaseName(String databaseName) {
+      this.databaseName = databaseName;
+      return this;
+    }
+
+    /**
+     * Sets the table operation to be used for partition operations.
+     *
+     * @param tableOperation The instance of TableOperation.
+     * @return This Builder instance.
+     */
+    public Builder withTableOperation(TableOperation tableOperation) {
+      this.tableOperation = tableOperation;
+      return this;
+    }
+
     /** Creates a new instance of {@link Builder}. */
     private Builder() {}
+
     /**
      * Internal method to build a JdbcTable instance using the provided values.
      *
@@ -58,6 +99,9 @@ public class JdbcTable extends BaseTable {
       jdbcTable.partitioning = partitioning;
       jdbcTable.sortOrders = sortOrders;
       jdbcTable.indexes = indexes;
+      jdbcTable.proxyPlugin = proxyPlugin;
+      jdbcTable.databaseName = databaseName;
+      jdbcTable.tableOperation = tableOperation;
       return jdbcTable;
     }
 
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index bc431f92b..2688c6aa1 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -155,7 +155,7 @@ public abstract class JdbcTableOperations implements 
TableOperation {
     JdbcTable.Builder builder = null;
     while (tablesResult.next() && !found) {
       if (Objects.equals(tablesResult.getString("TABLE_NAME"), tableName)) {
-        builder = getBasicJdbcTableInfo(tablesResult);
+        builder = 
getBasicJdbcTableInfo(tablesResult).withDatabaseName(databaseName);
         found = true;
       }
     }
@@ -210,7 +210,8 @@ public abstract class JdbcTableOperations implements 
TableOperation {
 
       // 6.Leave the information to the bottom layer to append the table
       correctJdbcTableFields(connection, databaseName, tableName, 
jdbcTableBuilder);
-      return jdbcTableBuilder.build();
+
+      return jdbcTableBuilder.withTableOperation(this).build();
     } catch (SQLException e) {
       throw exceptionMapper.toGravitinoException(e);
     }
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
new file mode 100644
index 000000000..787c1539b
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.jdbc.operation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.SupportsPartitions;
+
+public abstract class JdbcTablePartitionOperations implements TableOperations, 
SupportsPartitions {
+  protected final DataSource dataSource;
+  protected final JdbcTable loadedTable;
+
+  protected JdbcTablePartitionOperations(DataSource dataSource, JdbcTable 
loadedTable) {
+    checkArgument(dataSource != null, "dataSource is null");
+    checkArgument(loadedTable != null, "loadedTable is null");
+    this.dataSource = dataSource;
+    this.loadedTable = loadedTable;
+  }
+
+  protected Connection getConnection(String databaseName) throws SQLException {
+    Connection connection = dataSource.getConnection();
+    connection.setCatalog(databaseName);
+    return connection;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // Nothing to be closed.
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
index 4f3c04dad..f22bd7453 100644
--- 
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
+++ 
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
@@ -114,4 +114,8 @@ public interface TableOperation {
    * @param tableName The name of the table.
    */
   boolean purge(String databaseName, String tableName);
+
+  default JdbcTablePartitionOperations 
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
+    throw new UnsupportedOperationException("Table partition operation is not 
supported yet");
+  }
 }
diff --git 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
new file mode 100644
index 000000000..1f7877e09
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris;
+
+import static 
org.apache.gravitino.connector.PropertyEntry.stringReservedPropertyEntry;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.connector.BasePropertiesMetadata;
+import org.apache.gravitino.connector.PropertyEntry;
+
+public class DorisTablePartitionPropertiesMetadata extends 
BasePropertiesMetadata {
+  public static final String NAME = "PartitionName";
+  public static final String VALUES_RANGE = "Range";
+  public static final String ID = "PartitionId";
+  public static final String KEY = "PartitionKey";
+  public static final String VISIBLE_VERSION = "VisibleVersion";
+  public static final String VISIBLE_VERSION_TIME = "VisibleVersionTime";
+  public static final String STATE = "State";
+  public static final String DATA_SIZE = "DataSize";
+  public static final String IS_IN_MEMORY = "IsInMemory";
+  private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
+
+  static {
+    List<PropertyEntry<?>> propertyEntries =
+        ImmutableList.of(
+            stringReservedPropertyEntry(NAME, "name of the partition", false),
+            stringReservedPropertyEntry(VALUES_RANGE, "value range of the 
partition", false),
+            stringReservedPropertyEntry(ID, "id of the partition", false),
+            stringReservedPropertyEntry(KEY, "partition column of the 
partition", false),
+            stringReservedPropertyEntry(VISIBLE_VERSION, "visible version of 
the partition", false),
+            stringReservedPropertyEntry(
+                VISIBLE_VERSION_TIME, "visible version time of the partition", 
false),
+            stringReservedPropertyEntry(STATE, "state of the partition", 
false),
+            stringReservedPropertyEntry(DATA_SIZE, "data size of the 
partition", false),
+            stringReservedPropertyEntry(
+                IS_IN_MEMORY, "whether data of the partition is stored in 
memory", false));
+
+    PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, 
PropertyEntry::getName);
+  }
+
+  @Override
+  protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
+    return PROPERTIES_METADATA;
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
index 897354d50..9c1cc11a2 100644
--- 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
+++ 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
@@ -24,8 +24,10 @@ import java.util.regex.Pattern;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.exceptions.UnauthorizedException;
@@ -44,6 +46,8 @@ public class DorisExceptionConverter extends 
JdbcExceptionConverter {
   static final int CODE_UNAUTHORIZED = 1045;
   static final int CODE_NO_SUCH_COLUMN = 1054;
   static final int CODE_OTHER = 1105;
+  static final int CODE_DELETE_NON_EXISTING_PARTITION = 1507;
+  static final int CODE_PARTITION_ALREADY_EXISTS = 1517;
 
   private static final String DATABASE_ALREADY_EXISTS_PATTERN_STRING =
       ".*?detailMessage = Can't create database '.*?'; database exists";
@@ -66,6 +70,18 @@ public class DorisExceptionConverter extends 
JdbcExceptionConverter {
   private static final Pattern TABLE_NOT_EXIST_PATTERN =
       Pattern.compile(TABLE_NOT_EXIST_PATTERN_STRING);
 
+  private static final String DELETE_NON_EXISTING_PARTITION_STRING =
+      ".*?detailMessage = Error in list of partitions to .*?";
+
+  private static final Pattern DELETE_NON_EXISTING_PARTITION =
+      Pattern.compile(DELETE_NON_EXISTING_PARTITION_STRING);
+
+  private static final String PARTITION_ALREADY_EXISTS_STRING =
+      ".*?detailMessage = Duplicate partition name .*?";
+
+  private static final Pattern PARTITION_ALREADY_EXISTS_PARTITION =
+      Pattern.compile(PARTITION_ALREADY_EXISTS_STRING);
+
   @SuppressWarnings("FormatStringAnnotation")
   @Override
   public GravitinoRuntimeException toGravitinoException(SQLException se) {
@@ -88,6 +104,10 @@ public class DorisExceptionConverter extends 
JdbcExceptionConverter {
         return new UnauthorizedException(se, se.getMessage());
       case CODE_NO_SUCH_COLUMN:
         return new NoSuchColumnException(se, se.getMessage());
+      case CODE_DELETE_NON_EXISTING_PARTITION:
+        return new NoSuchPartitionException(se, se.getMessage());
+      case CODE_PARTITION_ALREADY_EXISTS:
+        return new PartitionAlreadyExistsException(se, se.getMessage());
       default:
         return new GravitinoRuntimeException(se, se.getMessage());
     }
@@ -114,6 +134,13 @@ public class DorisExceptionConverter extends 
JdbcExceptionConverter {
       return CODE_NO_SUCH_TABLE;
     }
 
+    if (DELETE_NON_EXISTING_PARTITION.matcher(message).matches()) {
+      return CODE_DELETE_NON_EXISTING_PARTITION;
+    }
+
+    if (PARTITION_ALREADY_EXISTS_PARTITION.matcher(message).matches()) {
+      return CODE_PARTITION_ALREADY_EXISTS;
+    }
     return CODE_OTHER;
   }
 }
diff --git 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
index e54f03ce8..b25bd14c3 100644
--- 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
+++ 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
@@ -46,6 +46,7 @@ import org.apache.gravitino.catalog.doris.utils.DorisUtils;
 import org.apache.gravitino.catalog.jdbc.JdbcColumn;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
 import org.apache.gravitino.exceptions.NoSuchColumnException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -83,6 +84,12 @@ public class DorisTableOperations extends 
JdbcTableOperations {
     }
   }
 
+  @Override
+  public JdbcTablePartitionOperations 
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
+    return new DorisTablePartitionOperations(
+        dataSource, loadedTable, exceptionMapper, typeConverter);
+  }
+
   @Override
   protected String generateCreateTableSql(
       String tableName,
diff --git 
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
new file mode 100644
index 000000000..c3e5a2bba
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris.operation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.DATA_SIZE;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.ID;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.IS_IN_MEMORY;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.KEY;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.NAME;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.STATE;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VALUES_RANGE;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION;
+import static 
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION_TIME;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+
+public final class DorisTablePartitionOperations extends 
JdbcTablePartitionOperations {
+  private static final String PARTITION_TYPE_VALUE_PATTERN_STRING =
+      "types: \\[([^\\]]+)\\]; keys: \\[([^\\]]+)\\];";
+  private static final Pattern PARTITION_TYPE_VALUE_PATTERN =
+      Pattern.compile(PARTITION_TYPE_VALUE_PATTERN_STRING);
+
+  private final JdbcExceptionConverter exceptionConverter;
+  private final JdbcTypeConverter typeConverter;
+
+  public DorisTablePartitionOperations(
+      DataSource dataSource,
+      JdbcTable loadedTable,
+      JdbcExceptionConverter exceptionConverter,
+      JdbcTypeConverter typeConverter) {
+    super(dataSource, loadedTable);
+    checkArgument(exceptionConverter != null, "exceptionConverter is null");
+    checkArgument(typeConverter != null, "typeConverter is null");
+    this.exceptionConverter = exceptionConverter;
+    this.typeConverter = typeConverter;
+  }
+
+  @Override
+  public String[] listPartitionNames() {
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", 
loadedTable.name());
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        ImmutableList.Builder<String> partitionNames = ImmutableList.builder();
+        while (result.next()) {
+          partitionNames.add(result.getString("PartitionName"));
+        }
+        return partitionNames.build().toArray(new String[0]);
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
+  }
+
+  @Override
+  public Partition[] listPartitions() {
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+      Map<String, Type> columnTypes = getColumnType(connection);
+      String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", 
loadedTable.name());
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
+        while (result.next()) {
+          partitions.add(fromDorisPartition(result, partitionInfo, 
columnTypes));
+        }
+        return partitions.build().toArray(new Partition[0]);
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
+  }
+
+  @Override
+  public Partition getPartition(String partitionName) throws 
NoSuchPartitionException {
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+      Map<String, Type> columnTypes = getColumnType(connection);
+      String showPartitionsSql =
+          String.format(
+              "SHOW PARTITIONS FROM `%s` WHERE PartitionName = \"%s\"",
+              loadedTable.name(), partitionName);
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        if (result.next()) {
+          return fromDorisPartition(result, partitionInfo, columnTypes);
+        }
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
+    throw new NoSuchPartitionException("Partition %s does not exist", 
partitionName);
+  }
+
+  @Override
+  public Partition addPartition(Partition partition) throws 
PartitionAlreadyExistsException {
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+
+      String addPartitionSqlFormat = "ALTER TABLE `%s` ADD PARTITION `%s` 
VALUES %s";
+      String partitionValues;
+      Partition added;
+
+      if (partition instanceof RangePartition) {
+        Preconditions.checkArgument(
+            partitionInfo instanceof Transforms.RangeTransform,
+            "Table %s is non-range-partitioned, but trying to add a range 
partition",
+            loadedTable.name());
+
+        RangePartition rangePartition = (RangePartition) partition;
+        partitionValues = buildRangePartitionValues(rangePartition);
+
+        // The partition properties actually cannot be passed into Doris, we 
just return an empty
+        // map instead.
+        added =
+            Partitions.range(
+                rangePartition.name(),
+                rangePartition.upper(),
+                rangePartition.lower(),
+                Collections.emptyMap());
+      } else if (partition instanceof ListPartition) {
+        Preconditions.checkArgument(
+            partitionInfo instanceof Transforms.ListTransform,
+            "Table %s is non-list-partitioned, but trying to add a list 
partition",
+            loadedTable.name());
+
+        ListPartition listPartition = (ListPartition) partition;
+        partitionValues =
+            buildListPartitionValues(
+                listPartition, ((Transforms.ListTransform) 
partitionInfo).fieldNames().length);
+
+        added =
+            Partitions.list(listPartition.name(), listPartition.lists(), 
Collections.emptyMap());
+      } else {
+        throw new IllegalArgumentException("Unsupported partition type of 
Doris");
+      }
+
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate(
+            String.format(
+                addPartitionSqlFormat, loadedTable.name(), partition.name(), 
partitionValues));
+        return added;
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
+  }
+
+  @Override
+  public boolean dropPartition(String partitionName) {
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      String dropPartitionsSql =
+          String.format("ALTER TABLE `%s` DROP PARTITION `%s`", 
loadedTable.name(), partitionName);
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate(dropPartitionsSql);
+        return true;
+      }
+    } catch (SQLException e) {
+      GravitinoRuntimeException exception = 
exceptionConverter.toGravitinoException(e);
+      if (exception instanceof NoSuchPartitionException) {
+        return false;
+      }
+      throw exception;
+    }
+  }
+
+  private Partition fromDorisPartition(
+      ResultSet resultSet, Transform partitionInfo, Map<String, Type> 
columnTypes)
+      throws SQLException {
+    String partitionName = resultSet.getString(NAME);
+    String partitionKey = resultSet.getString(KEY);
+    String partitionValues = resultSet.getString(VALUES_RANGE);
+    ImmutableMap.Builder<String, String> propertiesBuilder = 
ImmutableMap.builder();
+    propertiesBuilder.put(ID, resultSet.getString(ID));
+    propertiesBuilder.put(VISIBLE_VERSION, 
resultSet.getString(VISIBLE_VERSION));
+    propertiesBuilder.put(VISIBLE_VERSION_TIME, 
resultSet.getString(VISIBLE_VERSION_TIME));
+    propertiesBuilder.put(STATE, resultSet.getString(STATE));
+    propertiesBuilder.put(KEY, partitionKey);
+    propertiesBuilder.put(DATA_SIZE, resultSet.getString(DATA_SIZE));
+    propertiesBuilder.put(IS_IN_MEMORY, resultSet.getString(IS_IN_MEMORY));
+    ImmutableMap<String, String> properties = propertiesBuilder.build();
+
+    String[] partitionKeys = partitionKey.split(", ");
+    if (partitionInfo instanceof Transforms.RangeTransform) {
+      if (partitionKeys.length != 1) {
+        throw new UnsupportedOperationException(
+            "Multi-column range partitioning in Doris is not supported yet");
+      }
+      Type partitionColumnType = columnTypes.get(partitionKeys[0]);
+      Literal<?> lower = Literals.NULL;
+      Literal<?> upper = Literals.NULL;
+      Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+      if (matcher.find()) {
+        String lowerValue = matcher.group(2);
+        lower = Literals.of(lowerValue, partitionColumnType);
+        if (matcher.find()) {
+          String upperValue = matcher.group(2);
+          upper = Literals.of(upperValue, partitionColumnType);
+        }
+      }
+      return Partitions.range(partitionName, upper, lower, properties);
+    } else if (partitionInfo instanceof Transforms.ListTransform) {
+      Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+      ImmutableList.Builder<Literal<?>[]> lists = ImmutableList.builder();
+      while (matcher.find()) {
+        String[] values = matcher.group(2).split(", ");
+        ImmutableList.Builder<Literal<?>> literValues = 
ImmutableList.builder();
+        for (int i = 0; i < values.length; i++) {
+          Type partitionColumnType = columnTypes.get(partitionKeys[i]);
+          literValues.add(Literals.of(values[i], partitionColumnType));
+        }
+        lists.add(literValues.build().toArray(new Literal<?>[0]));
+      }
+      return Partitions.list(
+          partitionName, lists.build().toArray(new Literal<?>[0][0]), 
properties);
+    } else {
+      throw new UnsupportedOperationException(
+          String.format("%s is not a partitioned table", loadedTable.name()));
+    }
+  }
+
+  private Map<String, Type> getColumnType(Connection connection) throws 
SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    try (ResultSet result =
+        metaData.getColumns(
+            connection.getCatalog(), connection.getSchema(), 
loadedTable.name(), null)) {
+      ImmutableMap.Builder<String, Type> columnTypes = ImmutableMap.builder();
+      while (result.next()) {
+        if (Objects.equals(result.getString("TABLE_NAME"), 
loadedTable.name())) {
+          JdbcTypeConverter.JdbcTypeBean typeBean =
+              new 
JdbcTypeConverter.JdbcTypeBean(result.getString("TYPE_NAME"));
+          typeBean.setColumnSize(result.getString("COLUMN_SIZE"));
+          typeBean.setScale(result.getString("DECIMAL_DIGITS"));
+          Type gravitinoType = typeConverter.toGravitino(typeBean);
+          String columnName = result.getString("COLUMN_NAME");
+          columnTypes.put(columnName, gravitinoType);
+        }
+      }
+      return columnTypes.build();
+    }
+  }
+
+  private String buildRangePartitionValues(RangePartition rangePartition) {
+    Literal<?> upper = rangePartition.upper();
+    Literal<?> lower = rangePartition.lower();
+    String partitionValues;
+    if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
+      partitionValues = "LESS THAN MAXVALUE";
+    } else if (Literals.NULL.equals(lower)) {
+      partitionValues = "LESS THAN (\"" + upper.value() + "\")";
+    } else if (Literals.NULL.equals(upper)) {
+      partitionValues = "[(\"" + lower.value() + "\"), (MAXVALUE))";
+    } else {
+      partitionValues = "[(\"" + lower.value() + "\"), (\"" + upper.value() + 
"\"))";
+    }
+    return partitionValues;
+  }
+
+  private String buildListPartitionValues(ListPartition listPartition, int 
partitionedFieldNums) {
+    Literal<?>[][] lists = listPartition.lists();
+    Preconditions.checkArgument(
+        lists.length > 0, "The number of values in list partition must be 
greater than 0");
+
+    ImmutableList.Builder<String> listValues = ImmutableList.builder();
+    for (Literal<?>[] part : lists) {
+      Preconditions.checkArgument(
+          part.length == partitionedFieldNums,
+          "The number of partitioning columns must be consistent");
+
+      StringBuilder values = new StringBuilder();
+      if (part.length > 1) {
+        values
+            .append("(")
+            .append(
+                Arrays.stream(part)
+                    .map(p -> "\"" + p.value() + "\"")
+                    .collect(Collectors.joining(",")))
+            .append(")");
+      } else {
+        values.append("\"").append(part[0].value()).append("\"");
+      }
+      listValues.add(values.toString());
+    }
+    return String.format("IN (%s)", 
listValues.build().stream().collect(Collectors.joining(",")));
+  }
+}
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
index b654f6552..2f32dc155 100644
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
@@ -19,6 +19,9 @@
 package org.apache.gravitino.catalog.doris.integration.test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
@@ -28,7 +31,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
@@ -36,6 +41,7 @@ import org.apache.gravitino.Schema;
 import org.apache.gravitino.SupportsSchemas;
 import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.integration.test.container.ContainerSuite;
@@ -44,21 +50,27 @@ import 
org.apache.gravitino.integration.test.util.AbstractIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
 import org.apache.gravitino.integration.test.util.ITUtils;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SupportsPartitions;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableCatalog;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.NamedReference;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.indexes.Index;
 import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.utils.RandomNameUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
@@ -127,12 +139,12 @@ public class CatalogDorisIT extends AbstractIT {
 
   private void createMetalake() {
     GravitinoMetalake[] gravitinoMetaLakes = AbstractIT.client.listMetalakes();
-    Assertions.assertEquals(0, gravitinoMetaLakes.length);
+    assertEquals(0, gravitinoMetaLakes.length);
 
     GravitinoMetalake createdMetalake =
         AbstractIT.client.createMetalake(metalakeName, "comment", 
Collections.emptyMap());
     GravitinoMetalake loadMetalake = 
AbstractIT.client.loadMetalake(metalakeName);
-    Assertions.assertEquals(createdMetalake, loadMetalake);
+    assertEquals(createdMetalake, loadMetalake);
 
     metalake = loadMetalake;
   }
@@ -160,7 +172,7 @@ public class CatalogDorisIT extends AbstractIT {
             "doris catalog comment",
             catalogProperties);
     Catalog loadCatalog = metalake.loadCatalog(catalogName);
-    Assertions.assertEquals(createdCatalog, loadCatalog);
+    assertEquals(createdCatalog, loadCatalog);
 
     catalog = loadCatalog;
   }
@@ -174,13 +186,14 @@ public class CatalogDorisIT extends AbstractIT {
 
     Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), 
schema_comment, prop);
     Schema loadSchema = catalog.asSchemas().loadSchema(ident.name());
-    Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+    assertEquals(createdSchema.name(), loadSchema.name());
 
-    Assertions.assertEquals(createdSchema.properties().get(propKey), 
propValue);
+    assertEquals(createdSchema.properties().get(propKey), propValue);
   }
 
   private Column[] createColumns() {
-    Column col1 = Column.of(DORIS_COL_NAME1, Types.IntegerType.get(), 
"col_1_comment");
+    Column col1 =
+        Column.of(DORIS_COL_NAME1, Types.IntegerType.get(), "col_1_comment", 
false, false, null);
     Column col2 = Column.of(DORIS_COL_NAME2, Types.VarCharType.of(10), 
"col_2_comment");
     Column col3 = Column.of(DORIS_COL_NAME3, Types.VarCharType.of(10), 
"col_3_comment");
 
@@ -203,7 +216,7 @@ public class CatalogDorisIT extends AbstractIT {
 
     // test list schemas
     String[] schemaNames = schemas.listSchemas();
-    Assertions.assertTrue(Arrays.asList(schemaNames).contains(schemaName));
+    assertTrue(Arrays.asList(schemaNames).contains(schemaName));
 
     // test create schema already exists
     String testSchemaName = 
GravitinoITUtils.genRandomName("create_schema_test");
@@ -211,29 +224,26 @@ public class CatalogDorisIT extends AbstractIT {
     schemas.createSchema(schemaIdent.name(), schema_comment, 
Collections.emptyMap());
 
     List<String> schemaNameList = Arrays.asList(schemas.listSchemas());
-    Assertions.assertTrue(schemaNameList.contains(testSchemaName));
+    assertTrue(schemaNameList.contains(testSchemaName));
 
-    Assertions.assertThrows(
+    assertThrows(
         SchemaAlreadyExistsException.class,
-        () -> {
-          schemas.createSchema(schemaIdent.name(), schema_comment, 
Collections.emptyMap());
-        });
+        () -> schemas.createSchema(schemaIdent.name(), schema_comment, 
Collections.emptyMap()));
 
     // test drop schema
-    Assertions.assertTrue(schemas.dropSchema(schemaIdent.name(), false));
+    assertTrue(schemas.dropSchema(schemaIdent.name(), false));
 
     // check schema is deleted
     // 1. check by load schema
-    Assertions.assertThrows(
-        NoSuchSchemaException.class, () -> 
schemas.loadSchema(schemaIdent.name()));
+    assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(schemaIdent.name()));
 
     // 2. check by list schema
     schemaNameList = Arrays.asList(schemas.listSchemas());
-    Assertions.assertFalse(schemaNameList.contains(testSchemaName));
+    assertFalse(schemaNameList.contains(testSchemaName));
 
     // test drop schema not exists
     NameIdentifier notExistsSchemaIdent = NameIdentifier.of(metalakeName, 
catalogName, "no-exits");
-    Assertions.assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(), 
false));
+    assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(), false));
   }
 
   @Test
@@ -255,23 +265,19 @@ public class CatalogDorisIT extends AbstractIT {
 
     // Try to drop a database, and cascade equals to false, it should not be 
allowed.
     Throwable excep =
-        Assertions.assertThrows(
+        assertThrows(
             RuntimeException.class, () -> 
catalog.asSchemas().dropSchema(schemaName, false));
-    Assertions.assertTrue(excep.getMessage().contains("the value of cascade 
should be true."));
+    assertTrue(excep.getMessage().contains("the value of cascade should be 
true."));
 
     // Check the database still exists
     catalog.asSchemas().loadSchema(schemaName);
 
     // Try to drop a database, and cascade equals to true, it should be 
allowed.
-    Assertions.assertTrue(catalog.asSchemas().dropSchema(schemaName, true));
+    assertTrue(catalog.asSchemas().dropSchema(schemaName, true));
 
     // Check database has been dropped
     SupportsSchemas schemas = catalog.asSchemas();
-    Assertions.assertThrows(
-        NoSuchSchemaException.class,
-        () -> {
-          schemas.loadSchema(schemaName);
-        });
+    assertThrows(NoSuchSchemaException.class, () -> 
schemas.loadSchema(schemaName));
   }
 
   @Test
@@ -283,54 +289,30 @@ public class CatalogDorisIT extends AbstractIT {
 
     // should throw an exception with string that might contain SQL injection
     String sqlInjection = databaseName + "`; DROP TABLE important_table; -- ";
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          schemas.createSchema(sqlInjection, comment, properties);
-        });
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          schemas.dropSchema(sqlInjection, false);
-        });
+        () -> schemas.createSchema(sqlInjection, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection, false));
 
     String sqlInjection1 = databaseName + "`; SLEEP(10); -- ";
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          schemas.createSchema(sqlInjection1, comment, properties);
-        });
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          schemas.dropSchema(sqlInjection1, false);
-        });
+        () -> schemas.createSchema(sqlInjection1, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection1, false));
 
     String sqlInjection2 =
         databaseName + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          schemas.createSchema(sqlInjection2, comment, properties);
-        });
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          schemas.dropSchema(sqlInjection2, false);
-        });
+        () -> schemas.createSchema(sqlInjection2, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(sqlInjection2, false));
 
     // should throw an exception with input that has more than 64 characters
     String invalidInput = StringUtils.repeat("a", 65);
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          schemas.createSchema(invalidInput, comment, properties);
-        });
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          schemas.dropSchema(invalidInput, false);
-        });
+        () -> schemas.createSchema(invalidInput, comment, properties));
+    assertThrows(IllegalArgumentException.class, () -> 
schemas.dropSchema(invalidInput, false));
   }
 
   @Test
@@ -407,24 +389,20 @@ public class CatalogDorisIT extends AbstractIT {
     NameIdentifier tableIdentifier =
         NameIdentifier.of(metalakeName, catalogName, schemaName, t1_name);
 
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          tableCatalog.createTable(
-              tableIdentifier,
-              columns,
-              table_comment,
-              properties,
-              Transforms.EMPTY_TRANSFORM,
-              Distributions.NONE,
-              new SortOrder[0],
-              t1_indexes);
-        });
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          catalog.asTableCatalog().dropTable(tableIdentifier);
-        });
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier,
+                columns,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                t1_indexes));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier));
 
     String t2_name = table_name + "`; SLEEP(10); -- ";
     Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false, 
false, null);
@@ -433,24 +411,20 @@ public class CatalogDorisIT extends AbstractIT {
     NameIdentifier tableIdentifier2 =
         NameIdentifier.of(metalakeName, catalogName, schemaName, t2_name);
 
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          tableCatalog.createTable(
-              tableIdentifier2,
-              columns2,
-              table_comment,
-              properties,
-              Transforms.EMPTY_TRANSFORM,
-              Distributions.NONE,
-              new SortOrder[0],
-              t2_indexes);
-        });
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          catalog.asTableCatalog().dropTable(tableIdentifier2);
-        });
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier2,
+                columns2,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                t2_indexes));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier2));
 
     String t3_name =
         table_name + "`; UPDATE Users SET password = 'newpassword' WHERE 
username = 'admin'; -- ";
@@ -460,24 +434,20 @@ public class CatalogDorisIT extends AbstractIT {
     NameIdentifier tableIdentifier3 =
         NameIdentifier.of(metalakeName, catalogName, schemaName, t3_name);
 
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          tableCatalog.createTable(
-              tableIdentifier3,
-              columns3,
-              table_comment,
-              properties,
-              Transforms.EMPTY_TRANSFORM,
-              Distributions.NONE,
-              new SortOrder[0],
-              t3_indexes);
-        });
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          catalog.asTableCatalog().dropTable(tableIdentifier3);
-        });
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier3,
+                columns3,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                t3_indexes));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier3));
 
     String invalidInput = StringUtils.repeat("a", 65);
     Column t4_col = Column.of(invalidInput, Types.LongType.get(), "id", false, 
false, null);
@@ -486,24 +456,20 @@ public class CatalogDorisIT extends AbstractIT {
     NameIdentifier tableIdentifier4 =
         NameIdentifier.of(metalakeName, catalogName, schemaName, invalidInput);
 
-    Assertions.assertThrows(
-        IllegalArgumentException.class,
-        () -> {
-          tableCatalog.createTable(
-              tableIdentifier4,
-              columns4,
-              table_comment,
-              properties,
-              Transforms.EMPTY_TRANSFORM,
-              Distributions.NONE,
-              new SortOrder[0],
-              t4_indexes);
-        });
-    Assertions.assertThrows(
+    assertThrows(
         IllegalArgumentException.class,
-        () -> {
-          catalog.asTableCatalog().dropTable(tableIdentifier4);
-        });
+        () ->
+            tableCatalog.createTable(
+                tableIdentifier4,
+                columns4,
+                table_comment,
+                properties,
+                Transforms.EMPTY_TRANSFORM,
+                Distributions.NONE,
+                new SortOrder[0],
+                t4_indexes));
+    assertThrows(
+        IllegalArgumentException.class, () -> 
catalog.asTableCatalog().dropTable(tableIdentifier4));
   }
 
   @Test
@@ -579,9 +545,7 @@ public class CatalogDorisIT extends AbstractIT {
         .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
         .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
         .untilAsserted(
-            () ->
-                Assertions.assertEquals(
-                    4, 
tableCatalog.loadTable(tableIdentifier).columns().length));
+            () -> assertEquals(4, 
tableCatalog.loadTable(tableIdentifier).columns().length));
 
     ITUtils.assertColumn(
         Column.of("col_4", Types.VarCharType.of(255), "col_4_comment"),
@@ -598,9 +562,7 @@ public class CatalogDorisIT extends AbstractIT {
         .atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
         .pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
         .untilAsserted(
-            () ->
-                Assertions.assertEquals(
-                    3, 
tableCatalog.loadTable(tableIdentifier).columns().length));
+            () -> assertEquals(3, 
tableCatalog.loadTable(tableIdentifier).columns().length));
   }
 
   @Test
@@ -623,7 +585,7 @@ public class CatalogDorisIT extends AbstractIT {
             Transforms.EMPTY_TRANSFORM,
             distribution,
             null);
-    Assertions.assertEquals(createdTable.name(), tableName);
+    assertEquals(createdTable.name(), tableName);
 
     // add index test.
     tableCatalog.alterTable(
@@ -662,4 +624,225 @@ public class CatalogDorisIT extends AbstractIT {
                         .index()
                         .length));
   }
+
+  @Test
+  void testDorisTablePartitionOperation() {
+    // create a partitioned table
+    String tableName = 
GravitinoITUtils.genRandomName("test_partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Distribution distribution = createDistribution();
+    Index[] indexes =
+        new Index[] {
+          Indexes.of(Index.IndexType.PRIMARY_KEY, "k1_index", new String[][] 
{{DORIS_COL_NAME1}})
+        };
+    Map<String, String> properties = createTableProperties();
+    Transform[] partitioning = {Transforms.list(new String[][] 
{{DORIS_COL_NAME1}})};
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            null,
+            indexes);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        indexes,
+        partitioning,
+        createdTable);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        indexes,
+        partitioning,
+        loadTable);
+
+    // get table partition operations
+    SupportsPartitions tablePartitionOperations = 
loadTable.supportPartitions();
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrectType =
+        Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    assertThrows(
+        IllegalArgumentException.class, () -> 
tablePartitionOperations.addPartition(incorrectType));
+
+    // add partition with incorrect value
+    Partition incorrectValue =
+        Partitions.list(
+            "p1", new Literal[][] {{Literals.NULL, Literals.NULL}}, 
Collections.emptyMap());
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> tablePartitionOperations.addPartition(incorrectValue));
+
+    // add partition
+    Literal[][] p1Values = {{Literals.integerLiteral(1)}};
+    Literal[][] p2Values = {{Literals.integerLiteral(2)}};
+    Literal[][] p3Values = {{Literals.integerLiteral(3)}};
+    ListPartition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+    ListPartition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+    ListPartition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+    ListPartition p1Added = (ListPartition) 
tablePartitionOperations.addPartition(p1);
+    assertEquals("p1", p1Added.name());
+    assertEquals(1, p1Added.lists().length);
+    assertEquals(1, p1Added.lists()[0].length);
+    assertEquals("1", p1Added.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), p1Added.lists()[0][0].dataType());
+    ListPartition p2Added = (ListPartition) 
tablePartitionOperations.addPartition(p2);
+    assertEquals("p2", p2Added.name());
+    assertEquals(1, p2Added.lists().length);
+    assertEquals(1, p2Added.lists()[0].length);
+    assertEquals("2", p2Added.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), p2Added.lists()[0][0].dataType());
+    ListPartition p3Added = (ListPartition) 
tablePartitionOperations.addPartition(p3);
+    assertEquals("p3", p3Added.name());
+    assertEquals(1, p3Added.lists().length);
+    assertEquals(1, p3Added.lists()[0].length);
+    assertEquals("3", p3Added.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), p3Added.lists()[0][0].dataType());
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+
+    Map<String, ListPartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+    assertEquals(3, partitions.size());
+    ListPartition actualP1 = partitions.get("p1");
+    assertEquals("p1", actualP1.name());
+    assertEquals(1, actualP1.lists().length);
+    assertEquals(1, actualP1.lists()[0].length);
+    assertEquals("1", actualP1.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP1.lists()[0][0].dataType());
+    ListPartition actualP2 = partitions.get("p2");
+    assertEquals("p2", actualP2.name());
+    assertEquals(1, actualP2.lists().length);
+    assertEquals(1, actualP2.lists()[0].length);
+    assertEquals("2", actualP2.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP2.lists()[0][0].dataType());
+    ListPartition actualP3 = partitions.get("p3");
+    assertEquals("p3", actualP3.name());
+    assertEquals(1, actualP3.lists().length);
+    assertEquals(1, actualP3.lists()[0].length);
+    assertEquals("3", actualP3.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP3.lists()[0][0].dataType());
+
+    actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1");
+    assertEquals("p1", actualP1.name());
+    assertEquals(1, actualP1.lists().length);
+    assertEquals(1, actualP1.lists()[0].length);
+    assertEquals("1", actualP1.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP1.lists()[0][0].dataType());
+    actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2");
+    assertEquals("p2", actualP2.name());
+    assertEquals(1, actualP2.lists().length);
+    assertEquals(1, actualP2.lists()[0].length);
+    assertEquals("2", actualP2.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP2.lists()[0][0].dataType());
+    actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3");
+    assertEquals("p3", actualP3.name());
+    assertEquals(1, actualP3.lists().length);
+    assertEquals(1, actualP3.lists()[0].length);
+    assertEquals("3", actualP3.lists()[0][0].value());
+    assertEquals(Types.IntegerType.get(), actualP3.lists()[0][0].dataType());
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(2, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+
+  @Test
+  void testNonPartitionedTable() {
+    // create a non-partitioned table
+    String tableName = 
GravitinoITUtils.genRandomName("test_non_partitioned_table");
+    NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+    Column[] columns = createColumns();
+    Distribution distribution = createDistribution();
+    Index[] indexes = Indexes.EMPTY_INDEXES;
+    Map<String, String> properties = createTableProperties();
+    Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+    TableCatalog tableCatalog = catalog.asTableCatalog();
+    Table createdTable =
+        tableCatalog.createTable(
+            tableIdentifier,
+            columns,
+            table_comment,
+            properties,
+            partitioning,
+            distribution,
+            null,
+            indexes);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        indexes,
+        partitioning,
+        createdTable);
+
+    // load table
+    Table loadTable = tableCatalog.loadTable(tableIdentifier);
+    ITUtils.assertionsTableInfo(
+        tableName,
+        table_comment,
+        Arrays.asList(columns),
+        properties,
+        indexes,
+        partitioning,
+        loadTable);
+
+    // get table partition operations
+    SupportsPartitions tablePartitionOperations = 
loadTable.supportPartitions();
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.listPartitionNames());
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.listPartitions());
+
+    assertThrows(
+        UnsupportedOperationException.class,
+        () ->
+            tablePartitionOperations.addPartition(
+                Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap())));
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    assertThrows(
+        UnsupportedOperationException.class, () -> 
tablePartitionOperations.dropPartition("p1"));
+  }
 }
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
index 3f24e75f2..d93f6c817 100644
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.catalog.doris.operation;
 import com.google.common.collect.Maps;
 import java.util.Collections;
 import java.util.Map;
-import javax.sql.DataSource;
 import 
org.apache.gravitino.catalog.doris.converter.DorisColumnDefaultValueConverter;
 import org.apache.gravitino.catalog.doris.converter.DorisExceptionConverter;
 import org.apache.gravitino.catalog.doris.converter.DorisTypeConverter;
@@ -41,14 +40,14 @@ public class TestDoris extends TestJdbc {
   public static void startup() {
     containerSuite.startDorisContainer();
 
-    DataSource dataSource = 
DataSourceUtils.createDataSource(getDorisCatalogProperties());
+    DATA_SOURCE = 
DataSourceUtils.createDataSource(getDorisCatalogProperties());
 
     DATABASE_OPERATIONS = new DorisDatabaseOperations();
     TABLE_OPERATIONS = new DorisTableOperations();
     JDBC_EXCEPTION_CONVERTER = new DorisExceptionConverter();
-    DATABASE_OPERATIONS.initialize(dataSource, JDBC_EXCEPTION_CONVERTER, 
Collections.emptyMap());
+    DATABASE_OPERATIONS.initialize(DATA_SOURCE, JDBC_EXCEPTION_CONVERTER, 
Collections.emptyMap());
     TABLE_OPERATIONS.initialize(
-        dataSource,
+        DATA_SOURCE,
         JDBC_EXCEPTION_CONVERTER,
         new DorisTypeConverter(),
         new DorisColumnDefaultValueConverter(),
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
new file mode 100644
index 000000000..0494082bd
--- /dev/null
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris.operation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.doris.converter.DorisTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-it")
+public class TestDorisTablePartitionOperations extends TestDoris {
+  private static final String databaseName = 
GravitinoITUtils.genRandomName("doris_test_db");
+  private static final Integer DEFAULT_BUCKET_SIZE = 1;
+  private static final JdbcTypeConverter TYPE_CONVERTER = new 
DorisTypeConverter();
+
+  @BeforeAll
+  public static void startup() {
+    TestDoris.startup();
+    createDatabase();
+  }
+
+  private static void createDatabase() {
+    DATABASE_OPERATIONS.create(databaseName, "test_comment", new HashMap<>());
+  }
+
+  private static Map<String, String> createProperties() {
+    return ImmutableMap.of("replication_allocation", "tag.location.default: 
1");
+  }
+
+  @Test
+  public void testRangePartition() {
+    String tableComment = "range_partitioned_table_comment";
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+    JdbcColumn col3 =
+        
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+    JdbcColumn col4 =
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+    String rangePartitionTableName = 
GravitinoITUtils.genRandomName("range_partition_table");
+    Transform[] rangePartition = new Transform[] {Transforms.range(new 
String[] {col4.name()})};
+    TABLE_OPERATIONS.create(
+        databaseName,
+        rangePartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        rangePartition,
+        distribution,
+        indexes);
+
+    // assert table info
+    JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName, 
rangePartitionTableName);
+    assertionsTableInfo(
+        rangePartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        rangePartition,
+        rangePartitionTable);
+    List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+    assertTrue(listTables.contains(rangePartitionTableName));
+
+    // create Table Partition Operations manually
+    JdbcTablePartitionOperations tablePartitionOperations =
+        new DorisTablePartitionOperations(
+            DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrect =
+        Partitions.list("test_incorrect", new Literal[][] {{Literals.NULL}}, 
null);
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
tablePartitionOperations.addPartition(incorrect));
+    assertEquals(
+        "Table "
+            + rangePartitionTableName
+            + " is non-list-partitioned, but trying to add a list partition",
+        exception.getMessage());
+
+    // add different kinds of range partitions
+    LocalDate today = LocalDate.now();
+    LocalDate tomorrow = today.plusDays(1);
+    Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+    Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+    Partition p1 = Partitions.range("p1", todayLiteral, Literals.NULL, 
Collections.emptyMap());
+    Partition p2 = Partitions.range("p2", tomorrowLiteral, todayLiteral, 
Collections.emptyMap());
+    Partition p3 = Partitions.range("p3", Literals.NULL, tomorrowLiteral, 
Collections.emptyMap());
+    assertEquals(p1, tablePartitionOperations.addPartition(p1));
+    assertEquals(p2, tablePartitionOperations.addPartition(p2));
+    assertEquals(p3, tablePartitionOperations.addPartition(p3));
+
+    // add partition with same name
+    Partition p4 = Partitions.range("p3", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    assertThrows(
+        PartitionAlreadyExistsException.class, () -> 
tablePartitionOperations.addPartition(p4));
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+
+    Map<String, RangePartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (RangePartition) p));
+    assertEquals(3, partitions.size());
+    RangePartition actualP1 = partitions.get("p1");
+    assertEquals(todayLiteral, actualP1.upper());
+    assertEquals(Literals.of("0000-01-01", Types.DateType.get()), 
actualP1.lower());
+    RangePartition actualP2 = partitions.get("p2");
+    assertEquals(tomorrowLiteral, actualP2.upper());
+    assertEquals(todayLiteral, actualP2.lower());
+    RangePartition actualP3 = partitions.get("p3");
+    assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), 
actualP3.upper());
+    assertEquals(tomorrowLiteral, actualP3.lower());
+
+    actualP1 = (RangePartition) tablePartitionOperations.getPartition("p1");
+    assertEquals(todayLiteral, actualP1.upper());
+    assertEquals(Literals.of("0000-01-01", Types.DateType.get()), 
actualP1.lower());
+    actualP2 = (RangePartition) tablePartitionOperations.getPartition("p2");
+    assertEquals(tomorrowLiteral, actualP2.upper());
+    assertEquals(todayLiteral, actualP2.lower());
+    actualP3 = (RangePartition) tablePartitionOperations.getPartition("p3");
+    assertEquals(Literals.of("MAXVALUE", Types.DateType.get()), 
actualP3.upper());
+    assertEquals(tomorrowLiteral, actualP3.lower());
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(2, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+
+  @Test
+  public void testListPartition() {
+    String tableComment = "list_partitioned_table_comment";
+    JdbcColumn col1 =
+        JdbcColumn.builder()
+            .withName("col_1")
+            .withType(Types.IntegerType.get())
+            .withNullable(false)
+            .build();
+    JdbcColumn col2 =
+        
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+    JdbcColumn col3 =
+        
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+    JdbcColumn col4 =
+        JdbcColumn.builder()
+            .withName("col_4")
+            .withType(Types.DateType.get())
+            .withNullable(false)
+            .build();
+    List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+    Distribution distribution =
+        Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+    Index[] indexes = new Index[] {};
+    String listPartitionTableName = 
GravitinoITUtils.genRandomName("list_partition_table");
+    Transform[] listPartition =
+        new Transform[] {Transforms.list(new String[][] {{col1.name()}, 
{col4.name()}})};
+    TABLE_OPERATIONS.create(
+        databaseName,
+        listPartitionTableName,
+        columns.toArray(new JdbcColumn[] {}),
+        tableComment,
+        createProperties(),
+        listPartition,
+        distribution,
+        indexes);
+
+    // assert table info
+    JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName, 
listPartitionTableName);
+    assertionsTableInfo(
+        listPartitionTableName,
+        tableComment,
+        columns,
+        Collections.emptyMap(),
+        null,
+        listPartition,
+        listPartitionTable);
+    List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+    assertTrue(listTables.contains(listPartitionTableName));
+
+    // create Table Partition Operations manually
+    JdbcTablePartitionOperations tablePartitionOperations =
+        new DorisTablePartitionOperations(
+            DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER, 
TYPE_CONVERTER);
+
+    // assert partition info when there is no partitions actually
+    String[] emptyPartitionNames = 
tablePartitionOperations.listPartitionNames();
+    assertEquals(0, emptyPartitionNames.length);
+    Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+    assertEquals(0, emptyPartitions.length);
+
+    // get non-existing partition
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p1"));
+
+    // add partition with incorrect type
+    Partition incorrectType =
+        Partitions.range("p1", Literals.NULL, Literals.NULL, 
Collections.emptyMap());
+    IllegalArgumentException exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> tablePartitionOperations.addPartition(incorrectType));
+    assertEquals(
+        "Table "
+            + listPartitionTableName
+            + " is non-range-partitioned, but trying to add a range partition",
+        exception.getMessage());
+
+    // add partition with incorrect value
+    Partition incorrectValue =
+        Partitions.list("p1", new Literal[][] {{Literals.NULL}}, 
Collections.emptyMap());
+    exception =
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> tablePartitionOperations.addPartition(incorrectValue));
+    assertEquals("The number of partitioning columns must be consistent", 
exception.getMessage());
+
+    // add different kinds of list partitions
+    LocalDate today = LocalDate.now();
+    LocalDate tomorrow = today.plusDays(1);
+    Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+    Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+    Literal[][] p1Values = {{Literals.integerLiteral(1), todayLiteral}};
+    Literal[][] p2Values = {{Literals.integerLiteral(2), todayLiteral}};
+    Literal[][] p3Values = {{Literals.integerLiteral(1), tomorrowLiteral}};
+    Literal[][] p4Values = {{Literals.integerLiteral(2), tomorrowLiteral}};
+    Partition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+    Partition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+    Partition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+    Partition p4 = Partitions.list("p4", p4Values, Collections.emptyMap());
+    assertEquals(p1, tablePartitionOperations.addPartition(p1));
+    assertEquals(p2, tablePartitionOperations.addPartition(p2));
+    assertEquals(p3, tablePartitionOperations.addPartition(p3));
+    assertEquals(p4, tablePartitionOperations.addPartition(p4));
+
+    // check partitions
+    Set<String> partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(4, partitionNames.size());
+    assertTrue(partitionNames.contains("p1"));
+    assertTrue(partitionNames.contains("p2"));
+    assertTrue(partitionNames.contains("p3"));
+    assertTrue(partitionNames.contains("p4"));
+
+    Map<String, ListPartition> partitions =
+        Arrays.stream(tablePartitionOperations.listPartitions())
+            .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+    assertEquals(4, partitions.size());
+    ListPartition actualP1 = partitions.get("p1");
+    assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+    ListPartition actualP2 = partitions.get("p2");
+    assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+    ListPartition actualP3 = partitions.get("p3");
+    assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+    ListPartition actualP4 = partitions.get("p4");
+    assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+    actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1");
+    assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+    actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2");
+    assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+    actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3");
+    assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+    actualP4 = (ListPartition) tablePartitionOperations.getPartition("p4");
+    assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+    // drop partition
+    assertTrue(tablePartitionOperations.dropPartition("p3"));
+    partitionNames =
+        
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+    assertEquals(3, partitionNames.size());
+    assertFalse(partitionNames.contains("p3"));
+    assertThrows(NoSuchPartitionException.class, () -> 
tablePartitionOperations.getPartition("p3"));
+
+    // drop non-existing partition
+    assertFalse(tablePartitionOperations.dropPartition("p3"));
+  }
+}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseTable.java 
b/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
index 0b0f5914c..17b2f645a 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
@@ -77,12 +77,7 @@ public abstract class BaseTable implements Table {
         if (ops == null) {
           TableOperations newOps = newOps();
           ops =
-              proxyPlugin
-                  .map(
-                      plugin -> {
-                        return OperationsProxy.createProxy(newOps, plugin);
-                      })
-                  .orElse(newOps);
+              proxyPlugin.map(plugin -> OperationsProxy.createProxy(newOps, 
plugin)).orElse(newOps);
         }
       }
     }

Reply via email to