This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new e8eeaca93 [#3064] feat(jdbc-doris): Supports partition management in
Doris catalog (#3961)
e8eeaca93 is described below
commit e8eeaca93dd4b5a8e404e73807446c5ca422001a
Author: XiaoZ <[email protected]>
AuthorDate: Wed Jul 24 14:19:27 2024 +0800
[#3064] feat(jdbc-doris): Supports partition management in Doris catalog
(#3961)
### What changes were proposed in this pull request?
Supports partition management in Doris catalog.
### Why are the changes needed?
Fix: #3064
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT & IT.
---------
Co-authored-by: zhanghan18 <[email protected]>
---
.../rel/expressions/literals/Literals.java | 2 +-
.../catalog/jdbc/JdbcCatalogOperations.java | 4 +
.../apache/gravitino/catalog/jdbc/JdbcTable.java | 52 ++-
.../jdbc/operation/JdbcTableOperations.java | 5 +-
.../operation/JdbcTablePartitionOperations.java | 52 +++
.../catalog/jdbc/operation/TableOperation.java | 4 +
.../DorisTablePartitionPropertiesMetadata.java | 64 +++
.../doris/converter/DorisExceptionConverter.java | 27 ++
.../doris/operation/DorisTableOperations.java | 7 +
.../operation/DorisTablePartitionOperations.java | 334 +++++++++++++++
.../doris/integration/test/CatalogDorisIT.java | 457 +++++++++++++++------
.../catalog/doris/operation/TestDoris.java | 7 +-
.../TestDorisTablePartitionOperations.java | 360 ++++++++++++++++
.../org/apache/gravitino/connector/BaseTable.java | 7 +-
14 files changed, 1228 insertions(+), 154 deletions(-)
diff --git
a/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
b/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
index f9f6eb677..f789afbb8 100644
---
a/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
+++
b/api/src/main/java/org/apache/gravitino/rel/expressions/literals/Literals.java
@@ -39,7 +39,7 @@ public class Literals {
* @param value the literal value
* @param dataType the data type of the literal
* @param <T> the JVM type of value held by the literal
- * @return a new {@link org.apache.gravitino.rel.expressions.Literal}
instance
+ * @return a new {@link Literal} instance
*/
public static <T> LiteralImpl<T> of(T value, Type dataType) {
return new LiteralImpl<>(value, dataType);
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
index 8961df99e..4d2747a66 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcCatalogOperations.java
@@ -333,6 +333,8 @@ public class JdbcCatalogOperations implements
CatalogOperations, SupportsSchemas
.withProperties(properties)
.withIndexes(load.index())
.withPartitioning(load.partitioning())
+ .withDatabaseName(databaseName)
+ .withTableOperation(tableOperation)
.build();
}
@@ -450,6 +452,8 @@ public class JdbcCatalogOperations implements
CatalogOperations, SupportsSchemas
.withProperties(jdbcTablePropertiesMetadata.convertFromJdbcProperties(resultProperties))
.withPartitioning(partitioning)
.withIndexes(indexes)
+ .withDatabaseName(databaseName)
+ .withTableOperation(tableOperation)
.build();
}
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
index 6a5fc70e8..54a221866 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/JdbcTable.java
@@ -20,28 +20,69 @@ package org.apache.gravitino.catalog.jdbc;
import com.google.common.collect.Maps;
import java.util.Map;
-import lombok.Getter;
import lombok.ToString;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.gravitino.catalog.jdbc.operation.TableOperation;
import org.apache.gravitino.connector.BaseTable;
import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.SupportsPartitions;
/** Represents a Jdbc Table entity in the jdbc table. */
@ToString
-@Getter
public class JdbcTable extends BaseTable {
+ private String databaseName;
+ private TableOperation tableOperation;
private JdbcTable() {}
@Override
protected TableOperations newOps() {
- // TODO: Implement this method when we have the JDBC table operations.
- throw new UnsupportedOperationException("JdbcTable does not support
TableOperations.");
+ if (ArrayUtils.isEmpty(partitioning)) {
+ throw new UnsupportedOperationException(
+ "Table partition operation is not supported for non-partitioned
table: " + name);
+ }
+ return tableOperation.createJdbcTablePartitionOperations(this);
+ }
+
+ @Override
+ public SupportsPartitions supportPartitions() throws
UnsupportedOperationException {
+ return (SupportsPartitions) ops();
+ }
+
+ public String databaseName() {
+ return databaseName;
}
/** A builder class for constructing JdbcTable instances. */
public static class Builder extends BaseTableBuilder<Builder, JdbcTable> {
+ private String databaseName;
+ private TableOperation tableOperation;
+
+ /**
+ * Sets the name of database.
+ *
+ * @param databaseName The name of database.
+ * @return This Builder instance.
+ */
+ public Builder withDatabaseName(String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+
+ /**
+ * Sets the table operation to be used for partition operations.
+ *
+ * @param tableOperation The instance of TableOperation.
+ * @return This Builder instance.
+ */
+ public Builder withTableOperation(TableOperation tableOperation) {
+ this.tableOperation = tableOperation;
+ return this;
+ }
+
/** Creates a new instance of {@link Builder}. */
private Builder() {}
+
/**
* Internal method to build a JdbcTable instance using the provided values.
*
@@ -58,6 +99,9 @@ public class JdbcTable extends BaseTable {
jdbcTable.partitioning = partitioning;
jdbcTable.sortOrders = sortOrders;
jdbcTable.indexes = indexes;
+ jdbcTable.proxyPlugin = proxyPlugin;
+ jdbcTable.databaseName = databaseName;
+ jdbcTable.tableOperation = tableOperation;
return jdbcTable;
}
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
index bc431f92b..2688c6aa1 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTableOperations.java
@@ -155,7 +155,7 @@ public abstract class JdbcTableOperations implements
TableOperation {
JdbcTable.Builder builder = null;
while (tablesResult.next() && !found) {
if (Objects.equals(tablesResult.getString("TABLE_NAME"), tableName)) {
- builder = getBasicJdbcTableInfo(tablesResult);
+ builder =
getBasicJdbcTableInfo(tablesResult).withDatabaseName(databaseName);
found = true;
}
}
@@ -210,7 +210,8 @@ public abstract class JdbcTableOperations implements
TableOperation {
// 6.Leave the information to the bottom layer to append the table
correctJdbcTableFields(connection, databaseName, tableName,
jdbcTableBuilder);
- return jdbcTableBuilder.build();
+
+ return jdbcTableBuilder.withTableOperation(this).build();
} catch (SQLException e) {
throw exceptionMapper.toGravitinoException(e);
}
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
new file mode 100644
index 000000000..787c1539b
--- /dev/null
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/JdbcTablePartitionOperations.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.jdbc.operation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.connector.TableOperations;
+import org.apache.gravitino.rel.SupportsPartitions;
+
+public abstract class JdbcTablePartitionOperations implements TableOperations,
SupportsPartitions {
+ protected final DataSource dataSource;
+ protected final JdbcTable loadedTable;
+
+ protected JdbcTablePartitionOperations(DataSource dataSource, JdbcTable
loadedTable) {
+ checkArgument(dataSource != null, "dataSource is null");
+ checkArgument(loadedTable != null, "loadedTable is null");
+ this.dataSource = dataSource;
+ this.loadedTable = loadedTable;
+ }
+
+ protected Connection getConnection(String databaseName) throws SQLException {
+ Connection connection = dataSource.getConnection();
+ connection.setCatalog(databaseName);
+ return connection;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Nothing to be closed.
+ }
+}
diff --git
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
index 4f3c04dad..f22bd7453 100644
---
a/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
+++
b/catalogs/catalog-jdbc-common/src/main/java/org/apache/gravitino/catalog/jdbc/operation/TableOperation.java
@@ -114,4 +114,8 @@ public interface TableOperation {
* @param tableName The name of the table.
*/
boolean purge(String databaseName, String tableName);
+
+ default JdbcTablePartitionOperations
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
+ throw new UnsupportedOperationException("Table partition operation is not
supported yet");
+ }
}
diff --git
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
new file mode 100644
index 000000000..1f7877e09
--- /dev/null
+++
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/DorisTablePartitionPropertiesMetadata.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris;
+
+import static
org.apache.gravitino.connector.PropertyEntry.stringReservedPropertyEntry;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.connector.BasePropertiesMetadata;
+import org.apache.gravitino.connector.PropertyEntry;
+
+public class DorisTablePartitionPropertiesMetadata extends
BasePropertiesMetadata {
+ public static final String NAME = "PartitionName";
+ public static final String VALUES_RANGE = "Range";
+ public static final String ID = "PartitionId";
+ public static final String KEY = "PartitionKey";
+ public static final String VISIBLE_VERSION = "VisibleVersion";
+ public static final String VISIBLE_VERSION_TIME = "VisibleVersionTime";
+ public static final String STATE = "State";
+ public static final String DATA_SIZE = "DataSize";
+ public static final String IS_IN_MEMORY = "IsInMemory";
+ private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
+
+ static {
+ List<PropertyEntry<?>> propertyEntries =
+ ImmutableList.of(
+ stringReservedPropertyEntry(NAME, "name of the partition", false),
+ stringReservedPropertyEntry(VALUES_RANGE, "value range of the
partition", false),
+ stringReservedPropertyEntry(ID, "id of the partition", false),
+ stringReservedPropertyEntry(KEY, "partition column of the
partition", false),
+ stringReservedPropertyEntry(VISIBLE_VERSION, "visible version of
the partition", false),
+ stringReservedPropertyEntry(
+ VISIBLE_VERSION_TIME, "visible version time of the partition",
false),
+ stringReservedPropertyEntry(STATE, "state of the partition",
false),
+ stringReservedPropertyEntry(DATA_SIZE, "data size of the
partition", false),
+ stringReservedPropertyEntry(
+ IS_IN_MEMORY, "whether data of the partition is stored in
memory", false));
+
+ PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
+ }
+
+ @Override
+ protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
+ return PROPERTIES_METADATA;
+ }
+}
diff --git
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
index 897354d50..9c1cc11a2 100644
---
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
+++
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/converter/DorisExceptionConverter.java
@@ -24,8 +24,10 @@ import java.util.regex.Pattern;
import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
import org.apache.gravitino.exceptions.NoSuchColumnException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.exceptions.UnauthorizedException;
@@ -44,6 +46,8 @@ public class DorisExceptionConverter extends
JdbcExceptionConverter {
static final int CODE_UNAUTHORIZED = 1045;
static final int CODE_NO_SUCH_COLUMN = 1054;
static final int CODE_OTHER = 1105;
+ static final int CODE_DELETE_NON_EXISTING_PARTITION = 1507;
+ static final int CODE_PARTITION_ALREADY_EXISTS = 1517;
private static final String DATABASE_ALREADY_EXISTS_PATTERN_STRING =
".*?detailMessage = Can't create database '.*?'; database exists";
@@ -66,6 +70,18 @@ public class DorisExceptionConverter extends
JdbcExceptionConverter {
private static final Pattern TABLE_NOT_EXIST_PATTERN =
Pattern.compile(TABLE_NOT_EXIST_PATTERN_STRING);
+ private static final String DELETE_NON_EXISTING_PARTITION_STRING =
+ ".*?detailMessage = Error in list of partitions to .*?";
+
+ private static final Pattern DELETE_NON_EXISTING_PARTITION =
+ Pattern.compile(DELETE_NON_EXISTING_PARTITION_STRING);
+
+ private static final String PARTITION_ALREADY_EXISTS_STRING =
+ ".*?detailMessage = Duplicate partition name .*?";
+
+ private static final Pattern PARTITION_ALREADY_EXISTS_PARTITION =
+ Pattern.compile(PARTITION_ALREADY_EXISTS_STRING);
+
@SuppressWarnings("FormatStringAnnotation")
@Override
public GravitinoRuntimeException toGravitinoException(SQLException se) {
@@ -88,6 +104,10 @@ public class DorisExceptionConverter extends
JdbcExceptionConverter {
return new UnauthorizedException(se, se.getMessage());
case CODE_NO_SUCH_COLUMN:
return new NoSuchColumnException(se, se.getMessage());
+ case CODE_DELETE_NON_EXISTING_PARTITION:
+ return new NoSuchPartitionException(se, se.getMessage());
+ case CODE_PARTITION_ALREADY_EXISTS:
+ return new PartitionAlreadyExistsException(se, se.getMessage());
default:
return new GravitinoRuntimeException(se, se.getMessage());
}
@@ -114,6 +134,13 @@ public class DorisExceptionConverter extends
JdbcExceptionConverter {
return CODE_NO_SUCH_TABLE;
}
+ if (DELETE_NON_EXISTING_PARTITION.matcher(message).matches()) {
+ return CODE_DELETE_NON_EXISTING_PARTITION;
+ }
+
+ if (PARTITION_ALREADY_EXISTS_PARTITION.matcher(message).matches()) {
+ return CODE_PARTITION_ALREADY_EXISTS;
+ }
return CODE_OTHER;
}
}
diff --git
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
index e54f03ce8..b25bd14c3 100644
---
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
+++
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTableOperations.java
@@ -46,6 +46,7 @@ import org.apache.gravitino.catalog.doris.utils.DorisUtils;
import org.apache.gravitino.catalog.jdbc.JdbcColumn;
import org.apache.gravitino.catalog.jdbc.JdbcTable;
import org.apache.gravitino.catalog.jdbc.operation.JdbcTableOperations;
+import
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
import org.apache.gravitino.exceptions.NoSuchColumnException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
@@ -83,6 +84,12 @@ public class DorisTableOperations extends
JdbcTableOperations {
}
}
+ @Override
+ public JdbcTablePartitionOperations
createJdbcTablePartitionOperations(JdbcTable loadedTable) {
+ return new DorisTablePartitionOperations(
+ dataSource, loadedTable, exceptionMapper, typeConverter);
+ }
+
@Override
protected String generateCreateTableSql(
String tableName,
diff --git
a/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
new file mode 100644
index 000000000..c3e5a2bba
--- /dev/null
+++
b/catalogs/catalog-jdbc-doris/src/main/java/org/apache/gravitino/catalog/doris/operation/DorisTablePartitionOperations.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris.operation;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.DATA_SIZE;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.ID;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.IS_IN_MEMORY;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.KEY;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.NAME;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.STATE;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VALUES_RANGE;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION;
+import static
org.apache.gravitino.catalog.doris.DorisTablePartitionPropertiesMetadata.VISIBLE_VERSION_TIME;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.sql.DataSource;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Type;
+
+public final class DorisTablePartitionOperations extends
JdbcTablePartitionOperations {
+ private static final String PARTITION_TYPE_VALUE_PATTERN_STRING =
+ "types: \\[([^\\]]+)\\]; keys: \\[([^\\]]+)\\];";
+ private static final Pattern PARTITION_TYPE_VALUE_PATTERN =
+ Pattern.compile(PARTITION_TYPE_VALUE_PATTERN_STRING);
+
+ private final JdbcExceptionConverter exceptionConverter;
+ private final JdbcTypeConverter typeConverter;
+
+ public DorisTablePartitionOperations(
+ DataSource dataSource,
+ JdbcTable loadedTable,
+ JdbcExceptionConverter exceptionConverter,
+ JdbcTypeConverter typeConverter) {
+ super(dataSource, loadedTable);
+ checkArgument(exceptionConverter != null, "exceptionConverter is null");
+ checkArgument(typeConverter != null, "typeConverter is null");
+ this.exceptionConverter = exceptionConverter;
+ this.typeConverter = typeConverter;
+ }
+
+ @Override
+ public String[] listPartitionNames() {
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`",
loadedTable.name());
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ ImmutableList.Builder<String> partitionNames = ImmutableList.builder();
+ while (result.next()) {
+ partitionNames.add(result.getString("PartitionName"));
+ }
+ return partitionNames.build().toArray(new String[0]);
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
+ }
+
+ @Override
+ public Partition[] listPartitions() {
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+ Map<String, Type> columnTypes = getColumnType(connection);
+ String showPartitionsSql = String.format("SHOW PARTITIONS FROM `%s`",
loadedTable.name());
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
+ while (result.next()) {
+ partitions.add(fromDorisPartition(result, partitionInfo,
columnTypes));
+ }
+ return partitions.build().toArray(new Partition[0]);
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
+ }
+
+ @Override
+ public Partition getPartition(String partitionName) throws
NoSuchPartitionException {
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+ Map<String, Type> columnTypes = getColumnType(connection);
+ String showPartitionsSql =
+ String.format(
+ "SHOW PARTITIONS FROM `%s` WHERE PartitionName = \"%s\"",
+ loadedTable.name(), partitionName);
+ try (Statement statement = connection.createStatement();
+ ResultSet result = statement.executeQuery(showPartitionsSql)) {
+ if (result.next()) {
+ return fromDorisPartition(result, partitionInfo, columnTypes);
+ }
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
+ throw new NoSuchPartitionException("Partition %s does not exist",
partitionName);
+ }
+
+ @Override
+ public Partition addPartition(Partition partition) throws
PartitionAlreadyExistsException {
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ Transform partitionInfo = loadedTable.partitioning()[0];
+
+ String addPartitionSqlFormat = "ALTER TABLE `%s` ADD PARTITION `%s`
VALUES %s";
+ String partitionValues;
+ Partition added;
+
+ if (partition instanceof RangePartition) {
+ Preconditions.checkArgument(
+ partitionInfo instanceof Transforms.RangeTransform,
+ "Table %s is non-range-partitioned, but trying to add a range
partition",
+ loadedTable.name());
+
+ RangePartition rangePartition = (RangePartition) partition;
+ partitionValues = buildRangePartitionValues(rangePartition);
+
+ // The partition properties actually cannot be passed into Doris, we
just return an empty
+ // map instead.
+ added =
+ Partitions.range(
+ rangePartition.name(),
+ rangePartition.upper(),
+ rangePartition.lower(),
+ Collections.emptyMap());
+ } else if (partition instanceof ListPartition) {
+ Preconditions.checkArgument(
+ partitionInfo instanceof Transforms.ListTransform,
+ "Table %s is non-list-partitioned, but trying to add a list
partition",
+ loadedTable.name());
+
+ ListPartition listPartition = (ListPartition) partition;
+ partitionValues =
+ buildListPartitionValues(
+ listPartition, ((Transforms.ListTransform)
partitionInfo).fieldNames().length);
+
+ added =
+ Partitions.list(listPartition.name(), listPartition.lists(),
Collections.emptyMap());
+ } else {
+ throw new IllegalArgumentException("Unsupported partition type of
Doris");
+ }
+
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(
+ String.format(
+ addPartitionSqlFormat, loadedTable.name(), partition.name(),
partitionValues));
+ return added;
+ }
+ } catch (SQLException e) {
+ throw exceptionConverter.toGravitinoException(e);
+ }
+ }
+
+ @Override
+ public boolean dropPartition(String partitionName) {
+ try (Connection connection = getConnection(loadedTable.databaseName())) {
+ String dropPartitionsSql =
+ String.format("ALTER TABLE `%s` DROP PARTITION `%s`",
loadedTable.name(), partitionName);
+ try (Statement statement = connection.createStatement()) {
+ statement.executeUpdate(dropPartitionsSql);
+ return true;
+ }
+ } catch (SQLException e) {
+ GravitinoRuntimeException exception =
exceptionConverter.toGravitinoException(e);
+ if (exception instanceof NoSuchPartitionException) {
+ return false;
+ }
+ throw exception;
+ }
+ }
+
+ private Partition fromDorisPartition(
+ ResultSet resultSet, Transform partitionInfo, Map<String, Type>
columnTypes)
+ throws SQLException {
+ String partitionName = resultSet.getString(NAME);
+ String partitionKey = resultSet.getString(KEY);
+ String partitionValues = resultSet.getString(VALUES_RANGE);
+ ImmutableMap.Builder<String, String> propertiesBuilder =
ImmutableMap.builder();
+ propertiesBuilder.put(ID, resultSet.getString(ID));
+ propertiesBuilder.put(VISIBLE_VERSION,
resultSet.getString(VISIBLE_VERSION));
+ propertiesBuilder.put(VISIBLE_VERSION_TIME,
resultSet.getString(VISIBLE_VERSION_TIME));
+ propertiesBuilder.put(STATE, resultSet.getString(STATE));
+ propertiesBuilder.put(KEY, partitionKey);
+ propertiesBuilder.put(DATA_SIZE, resultSet.getString(DATA_SIZE));
+ propertiesBuilder.put(IS_IN_MEMORY, resultSet.getString(IS_IN_MEMORY));
+ ImmutableMap<String, String> properties = propertiesBuilder.build();
+
+ String[] partitionKeys = partitionKey.split(", ");
+ if (partitionInfo instanceof Transforms.RangeTransform) {
+ if (partitionKeys.length != 1) {
+ throw new UnsupportedOperationException(
+ "Multi-column range partitioning in Doris is not supported yet");
+ }
+ Type partitionColumnType = columnTypes.get(partitionKeys[0]);
+ Literal<?> lower = Literals.NULL;
+ Literal<?> upper = Literals.NULL;
+ Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+ if (matcher.find()) {
+ String lowerValue = matcher.group(2);
+ lower = Literals.of(lowerValue, partitionColumnType);
+ if (matcher.find()) {
+ String upperValue = matcher.group(2);
+ upper = Literals.of(upperValue, partitionColumnType);
+ }
+ }
+ return Partitions.range(partitionName, upper, lower, properties);
+ } else if (partitionInfo instanceof Transforms.ListTransform) {
+ Matcher matcher = PARTITION_TYPE_VALUE_PATTERN.matcher(partitionValues);
+ ImmutableList.Builder<Literal<?>[]> lists = ImmutableList.builder();
+ while (matcher.find()) {
+ String[] values = matcher.group(2).split(", ");
+ ImmutableList.Builder<Literal<?>> literValues =
ImmutableList.builder();
+ for (int i = 0; i < values.length; i++) {
+ Type partitionColumnType = columnTypes.get(partitionKeys[i]);
+ literValues.add(Literals.of(values[i], partitionColumnType));
+ }
+ lists.add(literValues.build().toArray(new Literal<?>[0]));
+ }
+ return Partitions.list(
+ partitionName, lists.build().toArray(new Literal<?>[0][0]),
properties);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("%s is not a partitioned table", loadedTable.name()));
+ }
+ }
+
+ private Map<String, Type> getColumnType(Connection connection) throws
SQLException {
+ DatabaseMetaData metaData = connection.getMetaData();
+ try (ResultSet result =
+ metaData.getColumns(
+ connection.getCatalog(), connection.getSchema(),
loadedTable.name(), null)) {
+ ImmutableMap.Builder<String, Type> columnTypes = ImmutableMap.builder();
+ while (result.next()) {
+ if (Objects.equals(result.getString("TABLE_NAME"),
loadedTable.name())) {
+ JdbcTypeConverter.JdbcTypeBean typeBean =
+ new
JdbcTypeConverter.JdbcTypeBean(result.getString("TYPE_NAME"));
+ typeBean.setColumnSize(result.getString("COLUMN_SIZE"));
+ typeBean.setScale(result.getString("DECIMAL_DIGITS"));
+ Type gravitinoType = typeConverter.toGravitino(typeBean);
+ String columnName = result.getString("COLUMN_NAME");
+ columnTypes.put(columnName, gravitinoType);
+ }
+ }
+ return columnTypes.build();
+ }
+ }
+
+ private String buildRangePartitionValues(RangePartition rangePartition) {
+ Literal<?> upper = rangePartition.upper();
+ Literal<?> lower = rangePartition.lower();
+ String partitionValues;
+ if (Literals.NULL.equals(upper) && Literals.NULL.equals(lower)) {
+ partitionValues = "LESS THAN MAXVALUE";
+ } else if (Literals.NULL.equals(lower)) {
+ partitionValues = "LESS THAN (\"" + upper.value() + "\")";
+ } else if (Literals.NULL.equals(upper)) {
+ partitionValues = "[(\"" + lower.value() + "\"), (MAXVALUE))";
+ } else {
+ partitionValues = "[(\"" + lower.value() + "\"), (\"" + upper.value() +
"\"))";
+ }
+ return partitionValues;
+ }
+
+ private String buildListPartitionValues(ListPartition listPartition, int
partitionedFieldNums) {
+ Literal<?>[][] lists = listPartition.lists();
+ Preconditions.checkArgument(
+ lists.length > 0, "The number of values in list partition must be
greater than 0");
+
+ ImmutableList.Builder<String> listValues = ImmutableList.builder();
+ for (Literal<?>[] part : lists) {
+ Preconditions.checkArgument(
+ part.length == partitionedFieldNums,
+ "The number of partitioning columns must be consistent");
+
+ StringBuilder values = new StringBuilder();
+ if (part.length > 1) {
+ values
+ .append("(")
+ .append(
+ Arrays.stream(part)
+ .map(p -> "\"" + p.value() + "\"")
+ .collect(Collectors.joining(",")))
+ .append(")");
+ } else {
+ values.append("\"").append(part[0].value()).append("\"");
+ }
+ listValues.add(values.toString());
+ }
+ return String.format("IN (%s)",
listValues.build().stream().collect(Collectors.joining(",")));
+ }
+}
diff --git
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
index b654f6552..2f32dc155 100644
---
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
+++
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
@@ -19,6 +19,9 @@
package org.apache.gravitino.catalog.doris.integration.test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@@ -28,7 +31,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
@@ -36,6 +41,7 @@ import org.apache.gravitino.Schema;
import org.apache.gravitino.SupportsSchemas;
import org.apache.gravitino.catalog.jdbc.config.JdbcConfig;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.integration.test.container.ContainerSuite;
@@ -44,21 +50,27 @@ import
org.apache.gravitino.integration.test.util.AbstractIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
import org.apache.gravitino.integration.test.util.ITUtils;
import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.SupportsPartitions;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.NamedReference;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.indexes.Index;
import org.apache.gravitino.rel.indexes.Indexes;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.utils.RandomNameUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -127,12 +139,12 @@ public class CatalogDorisIT extends AbstractIT {
private void createMetalake() {
GravitinoMetalake[] gravitinoMetaLakes = AbstractIT.client.listMetalakes();
- Assertions.assertEquals(0, gravitinoMetaLakes.length);
+ assertEquals(0, gravitinoMetaLakes.length);
GravitinoMetalake createdMetalake =
AbstractIT.client.createMetalake(metalakeName, "comment",
Collections.emptyMap());
GravitinoMetalake loadMetalake =
AbstractIT.client.loadMetalake(metalakeName);
- Assertions.assertEquals(createdMetalake, loadMetalake);
+ assertEquals(createdMetalake, loadMetalake);
metalake = loadMetalake;
}
@@ -160,7 +172,7 @@ public class CatalogDorisIT extends AbstractIT {
"doris catalog comment",
catalogProperties);
Catalog loadCatalog = metalake.loadCatalog(catalogName);
- Assertions.assertEquals(createdCatalog, loadCatalog);
+ assertEquals(createdCatalog, loadCatalog);
catalog = loadCatalog;
}
@@ -174,13 +186,14 @@ public class CatalogDorisIT extends AbstractIT {
Schema createdSchema = catalog.asSchemas().createSchema(ident.name(),
schema_comment, prop);
Schema loadSchema = catalog.asSchemas().loadSchema(ident.name());
- Assertions.assertEquals(createdSchema.name(), loadSchema.name());
+ assertEquals(createdSchema.name(), loadSchema.name());
- Assertions.assertEquals(createdSchema.properties().get(propKey),
propValue);
+ assertEquals(createdSchema.properties().get(propKey), propValue);
}
private Column[] createColumns() {
- Column col1 = Column.of(DORIS_COL_NAME1, Types.IntegerType.get(),
"col_1_comment");
+ Column col1 =
+ Column.of(DORIS_COL_NAME1, Types.IntegerType.get(), "col_1_comment",
false, false, null);
Column col2 = Column.of(DORIS_COL_NAME2, Types.VarCharType.of(10),
"col_2_comment");
Column col3 = Column.of(DORIS_COL_NAME3, Types.VarCharType.of(10),
"col_3_comment");
@@ -203,7 +216,7 @@ public class CatalogDorisIT extends AbstractIT {
// test list schemas
String[] schemaNames = schemas.listSchemas();
- Assertions.assertTrue(Arrays.asList(schemaNames).contains(schemaName));
+ assertTrue(Arrays.asList(schemaNames).contains(schemaName));
// test create schema already exists
String testSchemaName =
GravitinoITUtils.genRandomName("create_schema_test");
@@ -211,29 +224,26 @@ public class CatalogDorisIT extends AbstractIT {
schemas.createSchema(schemaIdent.name(), schema_comment,
Collections.emptyMap());
List<String> schemaNameList = Arrays.asList(schemas.listSchemas());
- Assertions.assertTrue(schemaNameList.contains(testSchemaName));
+ assertTrue(schemaNameList.contains(testSchemaName));
- Assertions.assertThrows(
+ assertThrows(
SchemaAlreadyExistsException.class,
- () -> {
- schemas.createSchema(schemaIdent.name(), schema_comment,
Collections.emptyMap());
- });
+ () -> schemas.createSchema(schemaIdent.name(), schema_comment,
Collections.emptyMap()));
// test drop schema
- Assertions.assertTrue(schemas.dropSchema(schemaIdent.name(), false));
+ assertTrue(schemas.dropSchema(schemaIdent.name(), false));
// check schema is deleted
// 1. check by load schema
- Assertions.assertThrows(
- NoSuchSchemaException.class, () ->
schemas.loadSchema(schemaIdent.name()));
+ assertThrows(NoSuchSchemaException.class, () ->
schemas.loadSchema(schemaIdent.name()));
// 2. check by list schema
schemaNameList = Arrays.asList(schemas.listSchemas());
- Assertions.assertFalse(schemaNameList.contains(testSchemaName));
+ assertFalse(schemaNameList.contains(testSchemaName));
// test drop schema not exists
NameIdentifier notExistsSchemaIdent = NameIdentifier.of(metalakeName,
catalogName, "no-exits");
- Assertions.assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(),
false));
+ assertFalse(schemas.dropSchema(notExistsSchemaIdent.name(), false));
}
@Test
@@ -255,23 +265,19 @@ public class CatalogDorisIT extends AbstractIT {
// Try to drop a database, and cascade equals to false, it should not be
allowed.
Throwable excep =
- Assertions.assertThrows(
+ assertThrows(
RuntimeException.class, () ->
catalog.asSchemas().dropSchema(schemaName, false));
- Assertions.assertTrue(excep.getMessage().contains("the value of cascade
should be true."));
+ assertTrue(excep.getMessage().contains("the value of cascade should be
true."));
// Check the database still exists
catalog.asSchemas().loadSchema(schemaName);
// Try to drop a database, and cascade equals to true, it should be
allowed.
- Assertions.assertTrue(catalog.asSchemas().dropSchema(schemaName, true));
+ assertTrue(catalog.asSchemas().dropSchema(schemaName, true));
// Check database has been dropped
SupportsSchemas schemas = catalog.asSchemas();
- Assertions.assertThrows(
- NoSuchSchemaException.class,
- () -> {
- schemas.loadSchema(schemaName);
- });
+ assertThrows(NoSuchSchemaException.class, () ->
schemas.loadSchema(schemaName));
}
@Test
@@ -283,54 +289,30 @@ public class CatalogDorisIT extends AbstractIT {
// should throw an exception with string that might contain SQL injection
String sqlInjection = databaseName + "`; DROP TABLE important_table; -- ";
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- schemas.createSchema(sqlInjection, comment, properties);
- });
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- schemas.dropSchema(sqlInjection, false);
- });
+ () -> schemas.createSchema(sqlInjection, comment, properties));
+ assertThrows(IllegalArgumentException.class, () ->
schemas.dropSchema(sqlInjection, false));
String sqlInjection1 = databaseName + "`; SLEEP(10); -- ";
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- schemas.createSchema(sqlInjection1, comment, properties);
- });
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- schemas.dropSchema(sqlInjection1, false);
- });
+ () -> schemas.createSchema(sqlInjection1, comment, properties));
+ assertThrows(IllegalArgumentException.class, () ->
schemas.dropSchema(sqlInjection1, false));
String sqlInjection2 =
databaseName + "`; UPDATE Users SET password = 'newpassword' WHERE
username = 'admin'; -- ";
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- schemas.createSchema(sqlInjection2, comment, properties);
- });
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- schemas.dropSchema(sqlInjection2, false);
- });
+ () -> schemas.createSchema(sqlInjection2, comment, properties));
+ assertThrows(IllegalArgumentException.class, () ->
schemas.dropSchema(sqlInjection2, false));
// should throw an exception with input that has more than 64 characters
String invalidInput = StringUtils.repeat("a", 65);
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- schemas.createSchema(invalidInput, comment, properties);
- });
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- schemas.dropSchema(invalidInput, false);
- });
+ () -> schemas.createSchema(invalidInput, comment, properties));
+ assertThrows(IllegalArgumentException.class, () ->
schemas.dropSchema(invalidInput, false));
}
@Test
@@ -407,24 +389,20 @@ public class CatalogDorisIT extends AbstractIT {
NameIdentifier tableIdentifier =
NameIdentifier.of(metalakeName, catalogName, schemaName, t1_name);
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- t1_indexes);
- });
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- catalog.asTableCatalog().dropTable(tableIdentifier);
- });
+ () ->
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ t1_indexes));
+ assertThrows(
+ IllegalArgumentException.class, () ->
catalog.asTableCatalog().dropTable(tableIdentifier));
String t2_name = table_name + "`; SLEEP(10); -- ";
Column t2_col = Column.of(t2_name, Types.LongType.get(), "id", false,
false, null);
@@ -433,24 +411,20 @@ public class CatalogDorisIT extends AbstractIT {
NameIdentifier tableIdentifier2 =
NameIdentifier.of(metalakeName, catalogName, schemaName, t2_name);
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- tableCatalog.createTable(
- tableIdentifier2,
- columns2,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- t2_indexes);
- });
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- catalog.asTableCatalog().dropTable(tableIdentifier2);
- });
+ () ->
+ tableCatalog.createTable(
+ tableIdentifier2,
+ columns2,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ t2_indexes));
+ assertThrows(
+ IllegalArgumentException.class, () ->
catalog.asTableCatalog().dropTable(tableIdentifier2));
String t3_name =
table_name + "`; UPDATE Users SET password = 'newpassword' WHERE
username = 'admin'; -- ";
@@ -460,24 +434,20 @@ public class CatalogDorisIT extends AbstractIT {
NameIdentifier tableIdentifier3 =
NameIdentifier.of(metalakeName, catalogName, schemaName, t3_name);
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- tableCatalog.createTable(
- tableIdentifier3,
- columns3,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- t3_indexes);
- });
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- catalog.asTableCatalog().dropTable(tableIdentifier3);
- });
+ () ->
+ tableCatalog.createTable(
+ tableIdentifier3,
+ columns3,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ t3_indexes));
+ assertThrows(
+ IllegalArgumentException.class, () ->
catalog.asTableCatalog().dropTable(tableIdentifier3));
String invalidInput = StringUtils.repeat("a", 65);
Column t4_col = Column.of(invalidInput, Types.LongType.get(), "id", false,
false, null);
@@ -486,24 +456,20 @@ public class CatalogDorisIT extends AbstractIT {
NameIdentifier tableIdentifier4 =
NameIdentifier.of(metalakeName, catalogName, schemaName, invalidInput);
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () -> {
- tableCatalog.createTable(
- tableIdentifier4,
- columns4,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- t4_indexes);
- });
- Assertions.assertThrows(
+ assertThrows(
IllegalArgumentException.class,
- () -> {
- catalog.asTableCatalog().dropTable(tableIdentifier4);
- });
+ () ->
+ tableCatalog.createTable(
+ tableIdentifier4,
+ columns4,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ t4_indexes));
+ assertThrows(
+ IllegalArgumentException.class, () ->
catalog.asTableCatalog().dropTable(tableIdentifier4));
}
@Test
@@ -579,9 +545,7 @@ public class CatalogDorisIT extends AbstractIT {
.atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
.pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
.untilAsserted(
- () ->
- Assertions.assertEquals(
- 4,
tableCatalog.loadTable(tableIdentifier).columns().length));
+ () -> assertEquals(4,
tableCatalog.loadTable(tableIdentifier).columns().length));
ITUtils.assertColumn(
Column.of("col_4", Types.VarCharType.of(255), "col_4_comment"),
@@ -598,9 +562,7 @@ public class CatalogDorisIT extends AbstractIT {
.atMost(MAX_WAIT_IN_SECONDS, TimeUnit.SECONDS)
.pollInterval(WAIT_INTERVAL_IN_SECONDS, TimeUnit.SECONDS)
.untilAsserted(
- () ->
- Assertions.assertEquals(
- 3,
tableCatalog.loadTable(tableIdentifier).columns().length));
+ () -> assertEquals(3,
tableCatalog.loadTable(tableIdentifier).columns().length));
}
@Test
@@ -623,7 +585,7 @@ public class CatalogDorisIT extends AbstractIT {
Transforms.EMPTY_TRANSFORM,
distribution,
null);
- Assertions.assertEquals(createdTable.name(), tableName);
+ assertEquals(createdTable.name(), tableName);
// add index test.
tableCatalog.alterTable(
@@ -662,4 +624,225 @@ public class CatalogDorisIT extends AbstractIT {
.index()
.length));
}
+
+ @Test
+ void testDorisTablePartitionOperation() {
+ // create a partitioned table
+ String tableName =
GravitinoITUtils.genRandomName("test_partitioned_table");
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Column[] columns = createColumns();
+ Distribution distribution = createDistribution();
+ Index[] indexes =
+ new Index[] {
+ Indexes.of(Index.IndexType.PRIMARY_KEY, "k1_index", new String[][]
{{DORIS_COL_NAME1}})
+ };
+ Map<String, String> properties = createTableProperties();
+ Transform[] partitioning = {Transforms.list(new String[][]
{{DORIS_COL_NAME1}})};
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table createdTable =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ null,
+ indexes);
+ ITUtils.assertionsTableInfo(
+ tableName,
+ table_comment,
+ Arrays.asList(columns),
+ properties,
+ indexes,
+ partitioning,
+ createdTable);
+
+ // load table
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ ITUtils.assertionsTableInfo(
+ tableName,
+ table_comment,
+ Arrays.asList(columns),
+ properties,
+ indexes,
+ partitioning,
+ loadTable);
+
+ // get table partition operations
+ SupportsPartitions tablePartitionOperations =
loadTable.supportPartitions();
+
+ // assert partition info when there is no partitions actually
+ String[] emptyPartitionNames =
tablePartitionOperations.listPartitionNames();
+ assertEquals(0, emptyPartitionNames.length);
+ Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+ assertEquals(0, emptyPartitions.length);
+
+ // get non-existing partition
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p1"));
+
+ // add partition with incorrect type
+ Partition incorrectType =
+ Partitions.range("p1", Literals.NULL, Literals.NULL,
Collections.emptyMap());
+ assertThrows(
+ IllegalArgumentException.class, () ->
tablePartitionOperations.addPartition(incorrectType));
+
+ // add partition with incorrect value
+ Partition incorrectValue =
+ Partitions.list(
+ "p1", new Literal[][] {{Literals.NULL, Literals.NULL}},
Collections.emptyMap());
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tablePartitionOperations.addPartition(incorrectValue));
+
+ // add partition
+ Literal[][] p1Values = {{Literals.integerLiteral(1)}};
+ Literal[][] p2Values = {{Literals.integerLiteral(2)}};
+ Literal[][] p3Values = {{Literals.integerLiteral(3)}};
+ ListPartition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+ ListPartition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+ ListPartition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+ ListPartition p1Added = (ListPartition)
tablePartitionOperations.addPartition(p1);
+ assertEquals("p1", p1Added.name());
+ assertEquals(1, p1Added.lists().length);
+ assertEquals(1, p1Added.lists()[0].length);
+ assertEquals("1", p1Added.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), p1Added.lists()[0][0].dataType());
+ ListPartition p2Added = (ListPartition)
tablePartitionOperations.addPartition(p2);
+ assertEquals("p2", p2Added.name());
+ assertEquals(1, p2Added.lists().length);
+ assertEquals(1, p2Added.lists()[0].length);
+ assertEquals("2", p2Added.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), p2Added.lists()[0][0].dataType());
+ ListPartition p3Added = (ListPartition)
tablePartitionOperations.addPartition(p3);
+ assertEquals("p3", p3Added.name());
+ assertEquals(1, p3Added.lists().length);
+ assertEquals(1, p3Added.lists()[0].length);
+ assertEquals("3", p3Added.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), p3Added.lists()[0][0].dataType());
+
+ // check partitions
+ Set<String> partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(3, partitionNames.size());
+ assertTrue(partitionNames.contains("p1"));
+ assertTrue(partitionNames.contains("p2"));
+ assertTrue(partitionNames.contains("p3"));
+
+ Map<String, ListPartition> partitions =
+ Arrays.stream(tablePartitionOperations.listPartitions())
+ .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+ assertEquals(3, partitions.size());
+ ListPartition actualP1 = partitions.get("p1");
+ assertEquals("p1", actualP1.name());
+ assertEquals(1, actualP1.lists().length);
+ assertEquals(1, actualP1.lists()[0].length);
+ assertEquals("1", actualP1.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP1.lists()[0][0].dataType());
+ ListPartition actualP2 = partitions.get("p2");
+ assertEquals("p2", actualP2.name());
+ assertEquals(1, actualP2.lists().length);
+ assertEquals(1, actualP2.lists()[0].length);
+ assertEquals("2", actualP2.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP2.lists()[0][0].dataType());
+ ListPartition actualP3 = partitions.get("p3");
+ assertEquals("p3", actualP3.name());
+ assertEquals(1, actualP3.lists().length);
+ assertEquals(1, actualP3.lists()[0].length);
+ assertEquals("3", actualP3.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP3.lists()[0][0].dataType());
+
+ actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1");
+ assertEquals("p1", actualP1.name());
+ assertEquals(1, actualP1.lists().length);
+ assertEquals(1, actualP1.lists()[0].length);
+ assertEquals("1", actualP1.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP1.lists()[0][0].dataType());
+ actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2");
+ assertEquals("p2", actualP2.name());
+ assertEquals(1, actualP2.lists().length);
+ assertEquals(1, actualP2.lists()[0].length);
+ assertEquals("2", actualP2.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP2.lists()[0][0].dataType());
+ actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3");
+ assertEquals("p3", actualP3.name());
+ assertEquals(1, actualP3.lists().length);
+ assertEquals(1, actualP3.lists()[0].length);
+ assertEquals("3", actualP3.lists()[0][0].value());
+ assertEquals(Types.IntegerType.get(), actualP3.lists()[0][0].dataType());
+
+ // drop partition
+ assertTrue(tablePartitionOperations.dropPartition("p3"));
+ partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(2, partitionNames.size());
+ assertFalse(partitionNames.contains("p3"));
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p3"));
+
+ // drop non-existing partition
+ assertFalse(tablePartitionOperations.dropPartition("p3"));
+ }
+
+ @Test
+ void testNonPartitionedTable() {
+ // create a non-partitioned table
+ String tableName =
GravitinoITUtils.genRandomName("test_non_partitioned_table");
+ NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
+ Column[] columns = createColumns();
+ Distribution distribution = createDistribution();
+ Index[] indexes = Indexes.EMPTY_INDEXES;
+ Map<String, String> properties = createTableProperties();
+ Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
+ TableCatalog tableCatalog = catalog.asTableCatalog();
+ Table createdTable =
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ null,
+ indexes);
+ ITUtils.assertionsTableInfo(
+ tableName,
+ table_comment,
+ Arrays.asList(columns),
+ properties,
+ indexes,
+ partitioning,
+ createdTable);
+
+ // load table
+ Table loadTable = tableCatalog.loadTable(tableIdentifier);
+ ITUtils.assertionsTableInfo(
+ tableName,
+ table_comment,
+ Arrays.asList(columns),
+ properties,
+ indexes,
+ partitioning,
+ loadTable);
+
+ // get table partition operations
+ SupportsPartitions tablePartitionOperations =
loadTable.supportPartitions();
+
+ assertThrows(
+ UnsupportedOperationException.class, () ->
tablePartitionOperations.listPartitionNames());
+
+ assertThrows(
+ UnsupportedOperationException.class, () ->
tablePartitionOperations.listPartitions());
+
+ assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ tablePartitionOperations.addPartition(
+ Partitions.range("p1", Literals.NULL, Literals.NULL,
Collections.emptyMap())));
+
+ assertThrows(
+ UnsupportedOperationException.class, () ->
tablePartitionOperations.getPartition("p1"));
+
+ assertThrows(
+ UnsupportedOperationException.class, () ->
tablePartitionOperations.dropPartition("p1"));
+ }
}
diff --git
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
index 3f24e75f2..d93f6c817 100644
---
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
+++
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDoris.java
@@ -21,7 +21,6 @@ package org.apache.gravitino.catalog.doris.operation;
import com.google.common.collect.Maps;
import java.util.Collections;
import java.util.Map;
-import javax.sql.DataSource;
import
org.apache.gravitino.catalog.doris.converter.DorisColumnDefaultValueConverter;
import org.apache.gravitino.catalog.doris.converter.DorisExceptionConverter;
import org.apache.gravitino.catalog.doris.converter.DorisTypeConverter;
@@ -41,14 +40,14 @@ public class TestDoris extends TestJdbc {
public static void startup() {
containerSuite.startDorisContainer();
- DataSource dataSource =
DataSourceUtils.createDataSource(getDorisCatalogProperties());
+ DATA_SOURCE =
DataSourceUtils.createDataSource(getDorisCatalogProperties());
DATABASE_OPERATIONS = new DorisDatabaseOperations();
TABLE_OPERATIONS = new DorisTableOperations();
JDBC_EXCEPTION_CONVERTER = new DorisExceptionConverter();
- DATABASE_OPERATIONS.initialize(dataSource, JDBC_EXCEPTION_CONVERTER,
Collections.emptyMap());
+ DATABASE_OPERATIONS.initialize(DATA_SOURCE, JDBC_EXCEPTION_CONVERTER,
Collections.emptyMap());
TABLE_OPERATIONS.initialize(
- dataSource,
+ DATA_SOURCE,
JDBC_EXCEPTION_CONVERTER,
new DorisTypeConverter(),
new DorisColumnDefaultValueConverter(),
diff --git
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
new file mode 100644
index 000000000..0494082bd
--- /dev/null
+++
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/operation/TestDorisTablePartitionOperations.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog.doris.operation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableMap;
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.catalog.doris.converter.DorisTypeConverter;
+import org.apache.gravitino.catalog.jdbc.JdbcColumn;
+import org.apache.gravitino.catalog.jdbc.JdbcTable;
+import org.apache.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
+import
org.apache.gravitino.catalog.jdbc.operation.JdbcTablePartitionOperations;
+import org.apache.gravitino.exceptions.NoSuchPartitionException;
+import org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.rel.expressions.NamedReference;
+import org.apache.gravitino.rel.expressions.distributions.Distribution;
+import org.apache.gravitino.rel.expressions.distributions.Distributions;
+import org.apache.gravitino.rel.expressions.literals.Literal;
+import org.apache.gravitino.rel.expressions.literals.Literals;
+import org.apache.gravitino.rel.expressions.transforms.Transform;
+import org.apache.gravitino.rel.expressions.transforms.Transforms;
+import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.partitions.ListPartition;
+import org.apache.gravitino.rel.partitions.Partition;
+import org.apache.gravitino.rel.partitions.Partitions;
+import org.apache.gravitino.rel.partitions.RangePartition;
+import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+@Tag("gravitino-docker-it")
+public class TestDorisTablePartitionOperations extends TestDoris {
+ private static final String databaseName =
GravitinoITUtils.genRandomName("doris_test_db");
+ private static final Integer DEFAULT_BUCKET_SIZE = 1;
+ private static final JdbcTypeConverter TYPE_CONVERTER = new
DorisTypeConverter();
+
+ @BeforeAll
+ public static void startup() {
+ TestDoris.startup();
+ createDatabase();
+ }
+
+ private static void createDatabase() {
+ DATABASE_OPERATIONS.create(databaseName, "test_comment", new HashMap<>());
+ }
+
+ private static Map<String, String> createProperties() {
+ return ImmutableMap.of("replication_allocation", "tag.location.default:
1");
+ }
+
+ @Test
+ public void testRangePartition() {
+ String tableComment = "range_partitioned_table_comment";
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("col_1")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+ JdbcColumn col3 =
+
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+ JdbcColumn col4 =
+ JdbcColumn.builder()
+ .withName("col_4")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+ Distribution distribution =
+ Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+ Index[] indexes = new Index[] {};
+ String rangePartitionTableName =
GravitinoITUtils.genRandomName("range_partition_table");
+ Transform[] rangePartition = new Transform[] {Transforms.range(new
String[] {col4.name()})};
+ TABLE_OPERATIONS.create(
+ databaseName,
+ rangePartitionTableName,
+ columns.toArray(new JdbcColumn[] {}),
+ tableComment,
+ createProperties(),
+ rangePartition,
+ distribution,
+ indexes);
+
+ // assert table info
+ JdbcTable rangePartitionTable = TABLE_OPERATIONS.load(databaseName,
rangePartitionTableName);
+ assertionsTableInfo(
+ rangePartitionTableName,
+ tableComment,
+ columns,
+ Collections.emptyMap(),
+ null,
+ rangePartition,
+ rangePartitionTable);
+ List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+ assertTrue(listTables.contains(rangePartitionTableName));
+
+ // create Table Partition Operations manually
+ JdbcTablePartitionOperations tablePartitionOperations =
+ new DorisTablePartitionOperations(
+ DATA_SOURCE, rangePartitionTable, JDBC_EXCEPTION_CONVERTER,
TYPE_CONVERTER);
+
+ // assert partition info when there is no partitions actually
+ String[] emptyPartitionNames =
tablePartitionOperations.listPartitionNames();
+ assertEquals(0, emptyPartitionNames.length);
+ Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+ assertEquals(0, emptyPartitions.length);
+
+ // get non-existing partition
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p1"));
+
+ // add partition with incorrect type
+ Partition incorrect =
+ Partitions.list("test_incorrect", new Literal[][] {{Literals.NULL}},
null);
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class, () ->
tablePartitionOperations.addPartition(incorrect));
+ assertEquals(
+ "Table "
+ + rangePartitionTableName
+ + " is non-list-partitioned, but trying to add a list partition",
+ exception.getMessage());
+
+ // add different kinds of range partitions
+ LocalDate today = LocalDate.now();
+ LocalDate tomorrow = today.plusDays(1);
+ Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+ Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+ Partition p1 = Partitions.range("p1", todayLiteral, Literals.NULL,
Collections.emptyMap());
+ Partition p2 = Partitions.range("p2", tomorrowLiteral, todayLiteral,
Collections.emptyMap());
+ Partition p3 = Partitions.range("p3", Literals.NULL, tomorrowLiteral,
Collections.emptyMap());
+ assertEquals(p1, tablePartitionOperations.addPartition(p1));
+ assertEquals(p2, tablePartitionOperations.addPartition(p2));
+ assertEquals(p3, tablePartitionOperations.addPartition(p3));
+
+ // add partition with same name
+ Partition p4 = Partitions.range("p3", Literals.NULL, Literals.NULL,
Collections.emptyMap());
+ assertThrows(
+ PartitionAlreadyExistsException.class, () ->
tablePartitionOperations.addPartition(p4));
+
+ // check partitions
+ Set<String> partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(3, partitionNames.size());
+ assertTrue(partitionNames.contains("p1"));
+ assertTrue(partitionNames.contains("p2"));
+ assertTrue(partitionNames.contains("p3"));
+
+ Map<String, RangePartition> partitions =
+ Arrays.stream(tablePartitionOperations.listPartitions())
+ .collect(Collectors.toMap(p -> p.name(), p -> (RangePartition) p));
+ assertEquals(3, partitions.size());
+ RangePartition actualP1 = partitions.get("p1");
+ assertEquals(todayLiteral, actualP1.upper());
+ assertEquals(Literals.of("0000-01-01", Types.DateType.get()),
actualP1.lower());
+ RangePartition actualP2 = partitions.get("p2");
+ assertEquals(tomorrowLiteral, actualP2.upper());
+ assertEquals(todayLiteral, actualP2.lower());
+ RangePartition actualP3 = partitions.get("p3");
+ assertEquals(Literals.of("MAXVALUE", Types.DateType.get()),
actualP3.upper());
+ assertEquals(tomorrowLiteral, actualP3.lower());
+
+ actualP1 = (RangePartition) tablePartitionOperations.getPartition("p1");
+ assertEquals(todayLiteral, actualP1.upper());
+ assertEquals(Literals.of("0000-01-01", Types.DateType.get()),
actualP1.lower());
+ actualP2 = (RangePartition) tablePartitionOperations.getPartition("p2");
+ assertEquals(tomorrowLiteral, actualP2.upper());
+ assertEquals(todayLiteral, actualP2.lower());
+ actualP3 = (RangePartition) tablePartitionOperations.getPartition("p3");
+ assertEquals(Literals.of("MAXVALUE", Types.DateType.get()),
actualP3.upper());
+ assertEquals(tomorrowLiteral, actualP3.lower());
+
+ // drop partition
+ assertTrue(tablePartitionOperations.dropPartition("p3"));
+ partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(2, partitionNames.size());
+ assertFalse(partitionNames.contains("p3"));
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p3"));
+
+ // drop non-existing partition
+ assertFalse(tablePartitionOperations.dropPartition("p3"));
+ }
+
+ @Test
+ public void testListPartition() {
+ String tableComment = "list_partitioned_table_comment";
+ JdbcColumn col1 =
+ JdbcColumn.builder()
+ .withName("col_1")
+ .withType(Types.IntegerType.get())
+ .withNullable(false)
+ .build();
+ JdbcColumn col2 =
+
JdbcColumn.builder().withName("col_2").withType(Types.BooleanType.get()).build();
+ JdbcColumn col3 =
+
JdbcColumn.builder().withName("col_3").withType(Types.DoubleType.get()).build();
+ JdbcColumn col4 =
+ JdbcColumn.builder()
+ .withName("col_4")
+ .withType(Types.DateType.get())
+ .withNullable(false)
+ .build();
+ List<JdbcColumn> columns = Arrays.asList(col1, col2, col3, col4);
+ Distribution distribution =
+ Distributions.hash(DEFAULT_BUCKET_SIZE, NamedReference.field("col_1"));
+ Index[] indexes = new Index[] {};
+ String listPartitionTableName =
GravitinoITUtils.genRandomName("list_partition_table");
+ Transform[] listPartition =
+ new Transform[] {Transforms.list(new String[][] {{col1.name()},
{col4.name()}})};
+ TABLE_OPERATIONS.create(
+ databaseName,
+ listPartitionTableName,
+ columns.toArray(new JdbcColumn[] {}),
+ tableComment,
+ createProperties(),
+ listPartition,
+ distribution,
+ indexes);
+
+ // assert table info
+ JdbcTable listPartitionTable = TABLE_OPERATIONS.load(databaseName,
listPartitionTableName);
+ assertionsTableInfo(
+ listPartitionTableName,
+ tableComment,
+ columns,
+ Collections.emptyMap(),
+ null,
+ listPartition,
+ listPartitionTable);
+ List<String> listTables = TABLE_OPERATIONS.listTables(databaseName);
+ assertTrue(listTables.contains(listPartitionTableName));
+
+ // create Table Partition Operations manually
+ JdbcTablePartitionOperations tablePartitionOperations =
+ new DorisTablePartitionOperations(
+ DATA_SOURCE, listPartitionTable, JDBC_EXCEPTION_CONVERTER,
TYPE_CONVERTER);
+
+ // assert partition info when there is no partitions actually
+ String[] emptyPartitionNames =
tablePartitionOperations.listPartitionNames();
+ assertEquals(0, emptyPartitionNames.length);
+ Partition[] emptyPartitions = tablePartitionOperations.listPartitions();
+ assertEquals(0, emptyPartitions.length);
+
+ // get non-existing partition
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p1"));
+
+ // add partition with incorrect type
+ Partition incorrectType =
+ Partitions.range("p1", Literals.NULL, Literals.NULL,
Collections.emptyMap());
+ IllegalArgumentException exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tablePartitionOperations.addPartition(incorrectType));
+ assertEquals(
+ "Table "
+ + listPartitionTableName
+ + " is non-range-partitioned, but trying to add a range partition",
+ exception.getMessage());
+
+ // add partition with incorrect value
+ Partition incorrectValue =
+ Partitions.list("p1", new Literal[][] {{Literals.NULL}},
Collections.emptyMap());
+ exception =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> tablePartitionOperations.addPartition(incorrectValue));
+ assertEquals("The number of partitioning columns must be consistent",
exception.getMessage());
+
+ // add different kinds of list partitions
+ LocalDate today = LocalDate.now();
+ LocalDate tomorrow = today.plusDays(1);
+ Literal<LocalDate> todayLiteral = Literals.dateLiteral(today);
+ Literal<LocalDate> tomorrowLiteral = Literals.dateLiteral(tomorrow);
+ Literal[][] p1Values = {{Literals.integerLiteral(1), todayLiteral}};
+ Literal[][] p2Values = {{Literals.integerLiteral(2), todayLiteral}};
+ Literal[][] p3Values = {{Literals.integerLiteral(1), tomorrowLiteral}};
+ Literal[][] p4Values = {{Literals.integerLiteral(2), tomorrowLiteral}};
+ Partition p1 = Partitions.list("p1", p1Values, Collections.emptyMap());
+ Partition p2 = Partitions.list("p2", p2Values, Collections.emptyMap());
+ Partition p3 = Partitions.list("p3", p3Values, Collections.emptyMap());
+ Partition p4 = Partitions.list("p4", p4Values, Collections.emptyMap());
+ assertEquals(p1, tablePartitionOperations.addPartition(p1));
+ assertEquals(p2, tablePartitionOperations.addPartition(p2));
+ assertEquals(p3, tablePartitionOperations.addPartition(p3));
+ assertEquals(p4, tablePartitionOperations.addPartition(p4));
+
+ // check partitions
+ Set<String> partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(4, partitionNames.size());
+ assertTrue(partitionNames.contains("p1"));
+ assertTrue(partitionNames.contains("p2"));
+ assertTrue(partitionNames.contains("p3"));
+ assertTrue(partitionNames.contains("p4"));
+
+ Map<String, ListPartition> partitions =
+ Arrays.stream(tablePartitionOperations.listPartitions())
+ .collect(Collectors.toMap(p -> p.name(), p -> (ListPartition) p));
+ assertEquals(4, partitions.size());
+ ListPartition actualP1 = partitions.get("p1");
+ assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+ ListPartition actualP2 = partitions.get("p2");
+ assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+ ListPartition actualP3 = partitions.get("p3");
+ assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+ ListPartition actualP4 = partitions.get("p4");
+ assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+ actualP1 = (ListPartition) tablePartitionOperations.getPartition("p1");
+ assertTrue(Arrays.deepEquals(actualP1.lists(), p1Values));
+ actualP2 = (ListPartition) tablePartitionOperations.getPartition("p2");
+ assertTrue(Arrays.deepEquals(actualP2.lists(), p2Values));
+ actualP3 = (ListPartition) tablePartitionOperations.getPartition("p3");
+ assertTrue(Arrays.deepEquals(actualP3.lists(), p3Values));
+ actualP4 = (ListPartition) tablePartitionOperations.getPartition("p4");
+ assertTrue(Arrays.deepEquals(actualP4.lists(), p4Values));
+
+ // drop partition
+ assertTrue(tablePartitionOperations.dropPartition("p3"));
+ partitionNames =
+
Arrays.stream(tablePartitionOperations.listPartitionNames()).collect(Collectors.toSet());
+ assertEquals(3, partitionNames.size());
+ assertFalse(partitionNames.contains("p3"));
+ assertThrows(NoSuchPartitionException.class, () ->
tablePartitionOperations.getPartition("p3"));
+
+ // drop non-existing partition
+ assertFalse(tablePartitionOperations.dropPartition("p3"));
+ }
+}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
b/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
index 0b0f5914c..17b2f645a 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseTable.java
@@ -77,12 +77,7 @@ public abstract class BaseTable implements Table {
if (ops == null) {
TableOperations newOps = newOps();
ops =
- proxyPlugin
- .map(
- plugin -> {
- return OperationsProxy.createProxy(newOps, plugin);
- })
- .orElse(newOps);
+ proxyPlugin.map(plugin -> OperationsProxy.createProxy(newOps,
plugin)).orElse(newOps);
}
}
}