morningman commented on code in PR #31765: URL: https://github.com/apache/doris/pull/31765#discussion_r1526076032
########## fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java: ########## @@ -878,6 +878,11 @@ public Map<Long, PartitionItem> getAndCopyPartitionItems() { return hivePartitionValues.getIdToPartitionItem(); } + public Long getPartitionId(String partName) { Review Comment: Not used? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java: ########## @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.analyzer; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.datasource.hive.HMSExternalCatalog; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalSink; +import org.apache.doris.nereids.util.RelationUtil; +import org.apache.doris.qe.ConnectContext; + +import java.util.List; +import java.util.Optional; + +/** + * Create unbound table sink + */ +public class UnboundTableSinkCreator { + + /** + * create unbound sink + */ + public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> nameParts, + List<String> colNames, List<String> hints, List<String> partitions, Plan query) + throws UserException { + String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0); + CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (curCatalog instanceof InternalCatalog) { + return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query); + } else if (curCatalog instanceof HMSExternalCatalog) { + return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query); + } + throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported."); + } + + /** Review Comment: Add comment to explain the difference of 2 methods ########## fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java: ########## @@ -59,9 +85,132 @@ public DataPartition getOutputPartition() { return DataPartition.RANDOM; } - public void init() { + public void bindDataSink(List<Column> insertCols, Optional<InsertCommandContext> insertCtx) { Review Comment: Add some comment to explain the whole logic ########## fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java: ########## @@ -59,9 +85,132 @@ public DataPartition getOutputPartition() { return DataPartition.RANDOM; } - public void init() { + public void bindDataSink(List<Column> insertCols, Optional<InsertCommandContext> insertCtx) { + THiveTableSink tSink = new THiveTableSink(); + tSink.setDbName(targetTable.getDbName()); + tSink.setTableName(targetTable.getName()); + Set<String> partNames = new HashSet<>(targetTable.getPartitionColumnNames()); + Set<String> colNames = targetTable.getColumns() + .stream().map(Column::getName) + .collect(Collectors.toSet()); + colNames.removeAll(partNames); + List<THiveColumn> targetColumns = new ArrayList<>(); + for (Column col : insertCols) { + if (partNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY); + targetColumns.add(tHiveColumn); + } else if (colNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.REGULAR); + targetColumns.add(tHiveColumn); + } + } + tSink.setColumns(targetColumns); + + setPartitionValues(tSink); + + StorageDescriptor sd = targetTable.getRemoteTable().getSd(); + THiveBucket bucketInfo = new THiveBucket(); + bucketInfo.setBucketedBy(sd.getBucketCols()); + bucketInfo.setBucketCount(sd.getNumBuckets()); + tSink.setBucketInfo(bucketInfo); + + TFileFormatType formatType = getFileFormatType(sd); + tSink.setFileFormat(formatType); + setCompressType(tSink, formatType); + + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + + String writeTempPath = createTempPath(location); + locationParams.setWritePath(writeTempPath); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + tSink.setLocation(locationParams); + + tSink.setHadoopConfig(targetTable.getHadoopProperties()); + + if (insertCtx.isPresent()) { + HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + tDataSink = new TDataSink(getDataSinkType()); + tDataSink.setHiveTableSink(tSink); + } + + private String createTempPath(String location) { + String user = ConnectContext.get().getUserIdentity().getUser(); + return location + "/.doris_staging/" + user + "/" + UUID.randomUUID().toString().replace("-", ""); + } + + private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { + String compressType; + switch (formatType) { + case FORMAT_ORC: + compressType = targetTable.getRemoteTable().getParameters().get("orc.compress"); + break; + case FORMAT_PARQUET: + compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression"); + break; + default: + compressType = "uncompressed"; + break; + } + + if ("snappy".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK); + } else if ("lz4".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZ4BLOCK); + } else if ("lzo".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZO); + } else if ("zlib".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZLIB); + } else if ("zstd".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZSTD); + } else { + tSink.setCompressionType(TFileCompressType.PLAIN); Review Comment: Should throw exception for unsupported compress type ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java: ########## @@ -54,4 +54,8 @@ public List<Slot> computeOutput() { .map(NamedExpression::toSlot) .collect(ImmutableList.toImmutableList()); } + + public PhysicalProperties getRequirePhysicalProperties() { + return PhysicalProperties.ANY; Review Comment: Double check whether `PhysicalProperties.ANY` is the default value for PhysicalSink. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java: ########## @@ -156,20 +160,40 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { */ private void insertInto(ConnectContext ctx, StmtExecutor executor, List<String> tempPartitionNames) throws Exception { - UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery; - UnboundTableSink<?> copySink = new UnboundTableSink<>( - sink.getNameParts(), - sink.getColNames(), - sink.getHints(), - true, - tempPartitionNames, - sink.isPartialUpdate(), - sink.getDMLCommandType(), - (LogicalPlan) (sink.child(0))); - // for overwrite situation, we disable auto create partition. - OlapInsertCommandContext insertCtx = new OlapInsertCommandContext(); - insertCtx.setAllowAutoPartition(false); - InsertIntoTableCommand insertCommand = new InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx)); + UnboundLogicalSink<?> copySink; + InsertCommandContext insertCtx; + if (logicalQuery instanceof UnboundTableSink) { + UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery; + copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + true, + tempPartitionNames, + sink.isPartialUpdate(), + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + // for overwrite situation, we disable auto create partition. + insertCtx = new OlapInsertCommandContext(); + ((OlapInsertCommandContext) insertCtx).setAllowAutoPartition(false); + } else if (logicalQuery instanceof UnboundHiveTableSink) { + UnboundHiveTableSink<?> sink = (UnboundHiveTableSink<?>) logicalQuery; + copySink = (UnboundLogicalSink<?>) UnboundTableSinkCreator.createUnboundTableSink( + sink.getNameParts(), + sink.getColNames(), + sink.getHints(), + false, + sink.getPartitions(), + false, + sink.getDMLCommandType(), + (LogicalPlan) (sink.child(0))); + insertCtx = new HiveInsertCommandContext(); + ((HiveInsertCommandContext) insertCtx).setOverwrite(false); + } else { + throw new RuntimeException("Current catalog has not supported insert overwrite yet."); Review Comment: ```suggestion throw new RuntimeException("Current catalog does not support insert overwrite."); ``` ########## fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java: ########## @@ -172,15 +176,27 @@ public List<Partition> getPartitions(String dbName, String tblName, List<String> private List<Partition> getPartitionsByNames(String dbName, String tblName, List<String> partitionNames) { List<String> partitionNamesWithQuote = partitionNames.stream().map(partitionName -> "'" + partitionName + "'") .collect(Collectors.toList()); - String partitionNamesString = Joiner.on(", ").join(partitionNamesWithQuote); - String sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," - + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," - + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" - + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" - + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" - + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" - + " AND \"PART_NAME\" in (%s);", - dbName, tblName, partitionNamesString); + String sql; + if (partitionNamesWithQuote.isEmpty()) { + sql = String.format("SELECT \"PART_ID\", \"PARTITIONS\".\"CREATE_TIME\"," + + " \"PARTITIONS\".\"LAST_ACCESS_TIME\"," + + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM \"PARTITIONS\"" + + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = \"PARTITIONS\".\"TBL_ID\"" + + " join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\"" + + " WHERE \"DBS\".\"NAME\" = '%s' AND \"TBLS\".\"TBL_NAME\"='%s'" + + " AND \"PART_NAME\";", Review Comment: The last is `AND PARTNAME`? And why changing this? This is unrelated to your feature ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java: ########## @@ -100,20 +118,50 @@ public List<? extends Expression> getExpressions() { @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), child(), hivePartitionKeys); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, logicalProperties.get(), children.get(0)); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, logicalProperties.get(), children.get(0), hivePartitionKeys); } @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalHiveTableSink<>(database, targetTable, cols, partitionIds, outputExprs, - groupExpression, getLogicalProperties(), physicalProperties, statistics, child()); + return new PhysicalHiveTableSink<>(database, targetTable, cols, outputExprs, + groupExpression, getLogicalProperties(), physicalProperties, statistics, child(), hivePartitionKeys); + } + + /** + * get output physical properties + */ + @Override + public PhysicalProperties getRequirePhysicalProperties() { + Set<String> hivePartitionKeys = targetTable.getRemoteTable() + .getPartitionKeys().stream() + .map(FieldSchema::getName) + .collect(Collectors.toSet()); + if (!hivePartitionKeys.isEmpty()) { + List<Integer> columnIdx = new ArrayList<>(); + List<Column> fullSchema = targetTable.getFullSchema(); + for (int i = 0; i < fullSchema.size(); i++) { + Column column = fullSchema.get(i); + if (hivePartitionKeys.contains(column.getName())) { + columnIdx.add(i); + } + } + DistributionSpecTableSinkHashPartitioned shuffleInfo = + (DistributionSpecTableSinkHashPartitioned) PhysicalProperties.SINK_HASH_PARTITIONED Review Comment: `PhysicalProperties.SINK_HASH_PARTITIONED` is a singleton, can we modify it? ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java: ########## @@ -345,13 +347,17 @@ private static Expression castValue(Expression value, DataType targetType) { * get target table from names. */ public static TableIf getTargetTable(Plan plan, ConnectContext ctx) { - if (!(plan instanceof UnboundTableSink)) { - throw new AnalysisException("the root of plan should be UnboundTableSink" - + " but it is " + plan.getType()); + if (plan instanceof UnboundTableSink) { Review Comment: What is the difference between this 2 branch? We need to try best to avoid `if else` for different type of table ########## fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java: ########## @@ -59,9 +85,132 @@ public DataPartition getOutputPartition() { return DataPartition.RANDOM; } - public void init() { + public void bindDataSink(List<Column> insertCols, Optional<InsertCommandContext> insertCtx) { + THiveTableSink tSink = new THiveTableSink(); + tSink.setDbName(targetTable.getDbName()); + tSink.setTableName(targetTable.getName()); + Set<String> partNames = new HashSet<>(targetTable.getPartitionColumnNames()); + Set<String> colNames = targetTable.getColumns() + .stream().map(Column::getName) + .collect(Collectors.toSet()); + colNames.removeAll(partNames); + List<THiveColumn> targetColumns = new ArrayList<>(); + for (Column col : insertCols) { + if (partNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY); + targetColumns.add(tHiveColumn); + } else if (colNames.contains(col.getName())) { + THiveColumn tHiveColumn = new THiveColumn(); + tHiveColumn.setName(col.getName()); + tHiveColumn.setDataType(col.getType().toThrift()); + tHiveColumn.setColumnType(THiveColumnType.REGULAR); + targetColumns.add(tHiveColumn); + } + } + tSink.setColumns(targetColumns); + + setPartitionValues(tSink); + + StorageDescriptor sd = targetTable.getRemoteTable().getSd(); + THiveBucket bucketInfo = new THiveBucket(); + bucketInfo.setBucketedBy(sd.getBucketCols()); + bucketInfo.setBucketCount(sd.getNumBuckets()); + tSink.setBucketInfo(bucketInfo); + + TFileFormatType formatType = getFileFormatType(sd); + tSink.setFileFormat(formatType); + setCompressType(tSink, formatType); + + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + + String writeTempPath = createTempPath(location); + locationParams.setWritePath(writeTempPath); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + tSink.setLocation(locationParams); + + tSink.setHadoopConfig(targetTable.getHadoopProperties()); + + if (insertCtx.isPresent()) { + HiveInsertCommandContext context = (HiveInsertCommandContext) insertCtx.get(); + tSink.setOverwrite(context.isOverwrite()); + } + tDataSink = new TDataSink(getDataSinkType()); + tDataSink.setHiveTableSink(tSink); + } + + private String createTempPath(String location) { + String user = ConnectContext.get().getUserIdentity().getUser(); + return location + "/.doris_staging/" + user + "/" + UUID.randomUUID().toString().replace("-", ""); + } + + private void setCompressType(THiveTableSink tSink, TFileFormatType formatType) { + String compressType; + switch (formatType) { + case FORMAT_ORC: + compressType = targetTable.getRemoteTable().getParameters().get("orc.compress"); + break; + case FORMAT_PARQUET: + compressType = targetTable.getRemoteTable().getParameters().get("parquet.compression"); + break; + default: + compressType = "uncompressed"; + break; + } + + if ("snappy".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK); + } else if ("lz4".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZ4BLOCK); + } else if ("lzo".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.LZO); + } else if ("zlib".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZLIB); + } else if ("zstd".equalsIgnoreCase(compressType)) { + tSink.setCompressionType(TFileCompressType.ZSTD); + } else { + tSink.setCompressionType(TFileCompressType.PLAIN); + } + } + + private void setPartitionValues(THiveTableSink tSink) { + List<THivePartition> partitions = new ArrayList<>(); + List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions = + ((HMSExternalCatalog) targetTable.getCatalog()) + .getClient().listPartitions(targetTable.getDbName(), targetTable.getName()); + for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) { + THivePartition hivePartition = new THivePartition(); + StorageDescriptor sd = partition.getSd(); + hivePartition.setFileFormat(getFileFormatType(sd)); + + hivePartition.setValues(partition.getValues()); + THiveLocationParams locationParams = new THiveLocationParams(); + String location = sd.getLocation(); + // pass the same of write path and target path to partition + locationParams.setWritePath(location); + locationParams.setTargetPath(location); + locationParams.setFileType(LocationPath.getTFileTypeForBE(location)); + hivePartition.setLocation(locationParams); + partitions.add(hivePartition); + } + tSink.setPartitions(partitions); + } + + private TFileFormatType getFileFormatType(StorageDescriptor sd) { + TFileFormatType fileFormatType; + if (sd.getInputFormat().toLowerCase().contains("orc")) { + fileFormatType = TFileFormatType.FORMAT_ORC; + } else { Review Comment: else if parquet xxxx else throw exception -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org