This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/gravitino.git

commit 8cdfda2c6e0e542e77b2fd7307b7664fccb4a708
Author: Jarvis <[email protected]>
AuthorDate: Wed Jul 30 17:01:33 2025 +0800

    [#3302][Sub-Task] StarRocks catalog Partition ops (#7791)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    add StarRocks Catalog Implement
    
    ### Why are the changes needed?
    
    To support StarRocks Catalog.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    By E2E test, the test is in another pr
    
    https://github.com/apache/gravitino/pull/7792
---
 .../StarRocksColumnDefaultValueConverter.java      |   2 +-
 .../StarRocksTablePartitionOperations.java         | 176 ++++++++++++++++++++-
 2 files changed, 169 insertions(+), 9 deletions(-)

diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
index a9f8c5bd86..d449f29b1a 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
@@ -62,7 +62,7 @@ public class StarRocksColumnDefaultValueConverter extends 
JdbcColumnDefaultValue
       if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) {
         return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
       }
-      // The parsing of Doris expressions is complex, so we are not currently 
undertaking the
+      // The parsing of StarRocks expressions is complex, so we are not 
currently undertaking the
       // parsing.
       return UnparsedExpression.of(columnDefaultValue);
     }
diff --git 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
index 97d038b547..61d534f916 100644
--- 
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
+++ 
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
@@ -20,23 +20,46 @@ package org.apache.gravitino.catalog.starrocks.operations;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
 import javax.sql.DataSource;
-import org.apache.commons.lang3.NotImplementedException;
 import org.apache.gravitino.catalog.jdbc.JdbcTable;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
 import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
 import 
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
 import org.apache.gravitino.exceptions.NoSuchPartitionException;
 import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
 import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Table partition operations for StarRocks. */
 public final class StarRocksTablePartitionOperations extends 
JdbcTablePartitionOperations {
 
-  @SuppressWarnings("unused")
+  private static final Logger log =
+      LoggerFactory.getLogger(StarRocksTablePartitionOperations.class);
+
   private final JdbcExceptionConverter exceptionConverter;
 
-  @SuppressWarnings("unused")
   private final JdbcTypeConverter typeConverter;
 
   public StarRocksTablePartitionOperations(
@@ -53,26 +76,163 @@ public final class StarRocksTablePartitionOperations 
extends JdbcTablePartitionO
 
   @Override
   public String[] listPartitionNames() {
-    throw new NotImplementedException("To be implemented in the future");
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", 
loadedTable.name());
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        ImmutableList.Builder<String> partitionNames = ImmutableList.builder();
+        while (result.next()) {
+          partitionNames.add(result.getString("PartitionName"));
+        }
+        return partitionNames.build().toArray(new String[0]);
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
   }
 
   @Override
   public Partition[] listPartitions() {
-    throw new NotImplementedException("To be implemented in the future");
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+      Map<String, Type> columnTypes = getColumnType(connection);
+      String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`", 
loadedTable.name());
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
+        while (result.next()) {
+          partitions.add(
+              StarRocksUtils.fromStarRocksPartition(
+                  loadedTable.name(), result, partitionInfo, columnTypes));
+        }
+        return partitions.build().toArray(new Partition[0]);
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
   }
 
   @Override
   public Partition getPartition(String partitionName) throws 
NoSuchPartitionException {
-    throw new NotImplementedException("To be implemented in the future");
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+      Map<String, Type> columnTypes = getColumnType(connection);
+      String showPartitionsSql =
+          String.format(
+              "SHOW PARTITIONS FROM `%s` WHERE PartitionName = \"%s\"",
+              loadedTable.name(), partitionName);
+      try (Statement statement = connection.createStatement();
+          ResultSet result = statement.executeQuery(showPartitionsSql)) {
+        if (result.next()) {
+          return StarRocksUtils.fromStarRocksPartition(
+              loadedTable.name(), result, partitionInfo, columnTypes);
+        }
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
+    throw new NoSuchPartitionException("Partition %s does not exist", 
partitionName);
   }
 
   @Override
   public Partition addPartition(Partition partition) throws 
PartitionAlreadyExistsException {
-    throw new NotImplementedException("To be implemented in the future");
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      Transform partitionInfo = loadedTable.partitioning()[0];
+
+      String addPartitionSqlFormat = "ALTER TABLE `%s` ADD %s";
+      String partitionSqlFragment;
+      Partition added;
+
+      if (partition instanceof RangePartition) {
+        Preconditions.checkArgument(
+            partitionInfo instanceof Transforms.RangeTransform,
+            "Table %s is non-range-partitioned, but trying to add a range 
partition",
+            loadedTable.name());
+
+        RangePartition rangePartition = (RangePartition) partition;
+        partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(rangePartition);
+
+        // The partition properties actually cannot be passed into StarRocks, 
we just return an
+        // empty
+        // map instead.
+        added =
+            Partitions.range(
+                rangePartition.name(),
+                rangePartition.upper(),
+                rangePartition.lower(),
+                Collections.emptyMap());
+      } else if (partition instanceof ListPartition) {
+        Preconditions.checkArgument(
+            partitionInfo instanceof Transforms.ListTransform,
+            "Table %s is non-list-partitioned, but trying to add a list 
partition",
+            loadedTable.name());
+
+        ListPartition listPartition = (ListPartition) partition;
+        Literal<?>[][] lists = listPartition.lists();
+        Preconditions.checkArgument(
+            lists.length > 0, "The number of values in list partition must be 
greater than 0");
+        Preconditions.checkArgument(
+            Arrays.stream(lists)
+                .allMatch(
+                    part ->
+                        part.length
+                            == ((Transforms.ListTransform) 
partitionInfo).fieldNames().length),
+            "The number of partitioning columns must be consistent");
+
+        partitionSqlFragment = 
StarRocksUtils.generatePartitionSqlFragment(listPartition);
+
+        added =
+            Partitions.list(listPartition.name(), listPartition.lists(), 
Collections.emptyMap());
+      } else {
+        throw new IllegalArgumentException("Unsupported partition type of 
StarRocks");
+      }
+      log.info("Generated add partition sql : {}", partitionSqlFragment);
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate(
+            String.format(addPartitionSqlFormat, loadedTable.name(), 
partitionSqlFragment));
+        return added;
+      }
+    } catch (SQLException e) {
+      throw exceptionConverter.toGravitinoException(e);
+    }
   }
 
   @Override
   public boolean dropPartition(String partitionName) {
-    throw new NotImplementedException("To be implemented in the future");
+    try (Connection connection = getConnection(loadedTable.databaseName())) {
+      String dropPartitionsSql =
+          String.format("ALTER TABLE `%s` DROP PARTITION `%s`", 
loadedTable.name(), partitionName);
+      try (Statement statement = connection.createStatement()) {
+        statement.executeUpdate(dropPartitionsSql);
+        return true;
+      }
+    } catch (SQLException e) {
+      GravitinoRuntimeException exception = 
exceptionConverter.toGravitinoException(e);
+      if (exception instanceof NoSuchPartitionException) {
+        return false;
+      }
+      throw exception;
+    }
+  }
+
+  private Map<String, Type> getColumnType(Connection connection) throws 
SQLException {
+    DatabaseMetaData metaData = connection.getMetaData();
+    try (ResultSet result =
+        metaData.getColumns(
+            connection.getCatalog(), connection.getSchema(), 
loadedTable.name(), null)) {
+      ImmutableMap.Builder<String, Type> columnTypes = ImmutableMap.builder();
+      while (result.next()) {
+        if (Objects.equals(result.getString("TABLE_NAME"), 
loadedTable.name())) {
+          JdbcTypeConverter.JdbcTypeBean typeBean =
+              new 
JdbcTypeConverter.JdbcTypeBean(result.getString("TYPE_NAME"));
+          typeBean.setColumnSize(result.getInt("COLUMN_SIZE"));
+          typeBean.setScale(result.getInt("DECIMAL_DIGITS"));
+          Type gravitinoType = typeConverter.toGravitino(typeBean);
+          String columnName = result.getString("COLUMN_NAME");
+          columnTypes.put(columnName, gravitinoType);
+        }
+      }
+      return columnTypes.build();
+    }
   }
 }

Reply via email to