This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 8b4d86ab5c [#3302][Sub-Task] StarRocks catalog Partition ops (#7791)
8b4d86ab5c is described below
commit 8b4d86ab5cb7da8b85c61c2c5049362a0c42cb14
Author: Jarvis <[email protected]>
AuthorDate: Wed Jul 30 17:01:33 2025 +0800
[#3302][Sub-Task] StarRocks catalog Partition ops (#7791)
<!--
1. Title: [#<issue>] <type>(<scope>): <subject>
Examples:
- "[#123] feat(operator): support xxx"
- "[#233] fix: check null before access result in xxx"
- "[MINOR] refactor: fix typo in variable name"
- "[MINOR] docs: fix typo in README"
- "[#255] test: fix flaky test NameOfTheTest"
Reference: https://www.conventionalcommits.org/en/v1.0.0/
2. If the PR is unfinished, please mark this PR as draft.
-->
### What changes were proposed in this pull request?
add StarRocks Catalog Implement
### Why are the changes needed?
To support StarRocks Catalog.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
By E2E test, the test is in another pr
https://github.com/apache/gravitino/pull/7792
---
.../StarRocksColumnDefaultValueConverter.java | 2 +-
.../StarRocksTablePartitionOperations.java | 176 ++++++++++++++++++++-
2 files changed, 169 insertions(+), 9 deletions(-)
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
index a9f8c5bd86..d449f29b1a 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/converter/StarRocksColumnDefaultValueConverter.java
@@ -62,7 +62,7 @@ public class StarRocksColumnDefaultValueConverter extends
JdbcColumnDefaultValue
if (columnDefaultValue.equals(CURRENT_TIMESTAMP)) {
return DEFAULT_VALUE_OF_CURRENT_TIMESTAMP;
}
- // The parsing of Doris expressions is complex, so we are not currently
undertaking the
+ // The parsing of StarRocks expressions is complex, so we are not
currently undertaking the
// parsing.
return UnparsedExpression.of(columnDefaultValue);
}
diff --git
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
index 97d038b547..61d534f916 100644
---
a/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
+++
b/catalogs/catalog-jdbc-starrocks/src/main/java/org/apache/gravitino/catalog/starrocks/operations/StarRocksTablePartitionOperations.java
@@ -20,23 +20,46 @@ package org.apache.gravitino.catalog.starrocks.operations;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
import javax.sql.DataSource;
-import org.apache.commons.lang3.NotImplementedException;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.catalog.starrocks.utils.StarRocksUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchPartitionException;
import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Table partition operations for StarRocks. */
public final class StarRocksTablePartitionOperations extends
JdbcTablePartitionOperations {
- @SuppressWarnings("unused")
+ private static final Logger log =
+ LoggerFactory.getLogger(StarRocksTablePartitionOperations.class);
+
private final JdbcExceptionConverter exceptionConverter;
- @SuppressWarnings("unused")
private final JdbcTypeConverter typeConverter;
public StarRocksTablePartitionOperations(
@@ -53,26 +76,163 @@ public final class StarRocksTablePartitionOperations
extends JdbcTablePartitionO
@Override
public String[] listPartitionNames() {
- throw new NotImplementedException("To be implemented in the future");
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`",
loadedTable.name());
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ ImmutableList.Builder<String> partitionNames = ImmutableList.builder();
+ while (result.next()) {
+ partitionNames.add(result.getString("PartitionName"));
+ }
+ return partitionNames.build().toArray(new String[0]);
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
}
@Override
public Partition[] listPartitions() {
- throw new NotImplementedException("To be implemented in the future");
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+ Map<String, Type> columnTypes = getColumnType(connection);
+ String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`",
loadedTable.name());
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
+ while (result.next()) {
+ partitions.add(
+ StarRocksUtils.fromStarRocksPartition(
+ loadedTable.name(), result, partitionInfo, columnTypes));
+ }
+ return partitions.build().toArray(new Partition[0]);
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
}
@Override
public Partition getPartition(String partitionName) throws
NoSuchPartitionException {
- throw new NotImplementedException("To be implemented in the future");
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+ Map<String, Type> columnTypes = getColumnType(connection);
+ String showPartitionsSql =
+ String.format(
+ "SHOW PARTITIONS FROM `%s` WHERE PartitionName = \"%s\"",
+ loadedTable.name(), partitionName);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ if (result.next()) {
+ return StarRocksUtils.fromStarRocksPartition(
+ loadedTable.name(), result, partitionInfo, columnTypes);
+ }
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
+ throw new NoSuchPartitionException("Partition %s does not exist",
partitionName);
}
@Override
public Partition addPartition(Partition partition) throws
PartitionAlreadyExistsException {
- throw new NotImplementedException("To be implemented in the future");
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+
+ String addPartitionSqlFormat = "ALTER TABLE `%s` ADD %s";
+ String partitionSqlFragment;
+ Partition added;
+
+ if (partition instanceof RangePartition) {
+ Preconditions.checkArgument(
+ partitionInfo instanceof Transforms.RangeTransform,
+ "Table %s is non-range-partitioned, but trying to add a range
partition",
+ loadedTable.name());
+
+ RangePartition rangePartition = (RangePartition) partition;
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(rangePartition);
+
+ // The partition properties actually cannot be passed into StarRocks,
we just return an
+ // empty
+ // map instead.
+ added =
+ Partitions.range(
+ rangePartition.name(),
+ rangePartition.upper(),
+ rangePartition.lower(),
+ Collections.emptyMap());
+ } else if (partition instanceof ListPartition) {
+ Preconditions.checkArgument(
+ partitionInfo instanceof Transforms.ListTransform,
+ "Table %s is non-list-partitioned, but trying to add a list
partition",
+ loadedTable.name());
+
+ ListPartition listPartition = (ListPartition) partition;
+ Literal<?>[][] lists = listPartition.lists();
+ Preconditions.checkArgument(
+ lists.length > 0, "The number of values in list partition must be
greater than 0");
+ Preconditions.checkArgument(
+ Arrays.stream(lists)
+ .allMatch(
+ part ->
+ part.length
+ == ((Transforms.ListTransform)
partitionInfo).fieldNames().length),
+ "The number of partitioning columns must be consistent");
+
+ partitionSqlFragment =
StarRocksUtils.generatePartitionSqlFragment(listPartition);
+
+ added =
+ Partitions.list(listPartition.name(), listPartition.lists(),
Collections.emptyMap());
+ } else {
+ throw new IllegalArgumentException("Unsupported partition type of
StarRocks");
+ }
+ log.info("Generated add partition sql : {}", partitionSqlFragment);
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(
+ String.format(addPartitionSqlFormat, loadedTable.name(),
partitionSqlFragment));
+ return added;
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
}
@Override
public boolean dropPartition(String partitionName) {
- throw new NotImplementedException("To be implemented in the future");
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ String dropPartitionsSql =
+ String.format("ALTER TABLE `%s` DROP PARTITION `%s`",
loadedTable.name(), partitionName);
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(dropPartitionsSql);
+ return true;
+ }
+ } catch (SQLException e) {
+ GravitinoRuntimeException exception =
exceptionConverter.toGravitinoException(e);
+ if (exception instanceof NoSuchPartitionException) {
+ return false;
+ }
+ throw exception;
+ }
+ }
+
+ private Map<String, Type> getColumnType(Connection connection) throws
SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ try (ResultSet result =
+ metaData.getColumns(
+ connection.getCatalog(), connection.getSchema(),
loadedTable.name(), null)) {
+ ImmutableMap.Builder<String, Type> columnTypes = ImmutableMap.builder();
+ while (result.next()) {
+ if (Objects.equals(result.getString("TABLE_NAME"),
loadedTable.name())) {
+ JdbcTypeConverter.JdbcTypeBean typeBean =
+ new
JdbcTypeConverter.JdbcTypeBean(result.getString("TYPE_NAME"));
+ typeBean.setColumnSize(result.getInt("COLUMN_SIZE"));
+ typeBean.setScale(result.getInt("DECIMAL_DIGITS"));
+ Type gravitinoType = typeConverter.toGravitino(typeBean);
+ String columnName = result.getString("COLUMN_NAME");
+ columnTypes.put(columnName, gravitinoType);
+ }
+ }
+ return columnTypes.build();
+ }
}
}