morningman commented on code in PR #31765:
URL: https://github.com/apache/doris/pull/31765#discussion_r1526076032

##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HMSExternalTable.java:
##########
@@ -878,6 +878,11 @@ public Map<Long, PartitionItem> getAndCopyPartitionItems() 
{
         return hivePartitionValues.getIdToPartitionItem();
     }
 
+    public Long getPartitionId(String partName) {

Review Comment:
   Not used?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/analyzer/UnboundTableSinkCreator.java:
##########
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.analyzer;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.InternalCatalog;
+import org.apache.doris.datasource.hive.HMSExternalCatalog;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
+import org.apache.doris.nereids.util.RelationUtil;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Create unbound table sink
+ */
+public class UnboundTableSinkCreator {
+
+    /**
+     * create unbound sink
+     */
+    public static LogicalSink<? extends Plan> 
createUnboundTableSink(List<String> nameParts,
+                List<String> colNames, List<String> hints, List<String> 
partitions, Plan query)
+            throws UserException {
+        String catalogName = 
RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
+        CatalogIf<?> curCatalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
+        if (curCatalog instanceof InternalCatalog) {
+            return new UnboundTableSink<>(nameParts, colNames, hints, 
partitions, query);
+        } else if (curCatalog instanceof HMSExternalCatalog) {
+            return new UnboundHiveTableSink<>(nameParts, colNames, hints, 
partitions, query);
+        }
+        throw new UserException("Load data to " + 
curCatalog.getClass().getSimpleName() + " is not supported.");
+    }
+
+    /**

Review Comment:
   Add comment to explain the difference of 2 methods



##########
fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java:
##########
@@ -59,9 +85,132 @@ public DataPartition getOutputPartition() {
         return DataPartition.RANDOM;
     }
 
-    public void init() {
+    public void bindDataSink(List<Column> insertCols, 
Optional<InsertCommandContext> insertCtx) {

Review Comment:
   Add some comment to explain the whole logic



##########
fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java:
##########
@@ -59,9 +85,132 @@ public DataPartition getOutputPartition() {
         return DataPartition.RANDOM;
     }
 
-    public void init() {
+    public void bindDataSink(List<Column> insertCols, 
Optional<InsertCommandContext> insertCtx) {
+        THiveTableSink tSink = new THiveTableSink();
+        tSink.setDbName(targetTable.getDbName());
+        tSink.setTableName(targetTable.getName());
+        Set<String> partNames = new 
HashSet<>(targetTable.getPartitionColumnNames());
+        Set<String> colNames = targetTable.getColumns()
+                .stream().map(Column::getName)
+                .collect(Collectors.toSet());
+        colNames.removeAll(partNames);
+        List<THiveColumn> targetColumns = new ArrayList<>();
+        for (Column col : insertCols) {
+            if (partNames.contains(col.getName())) {
+                THiveColumn tHiveColumn = new THiveColumn();
+                tHiveColumn.setName(col.getName());
+                tHiveColumn.setDataType(col.getType().toThrift());
+                tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY);
+                targetColumns.add(tHiveColumn);
+            } else if (colNames.contains(col.getName())) {
+                THiveColumn tHiveColumn = new THiveColumn();
+                tHiveColumn.setName(col.getName());
+                tHiveColumn.setDataType(col.getType().toThrift());
+                tHiveColumn.setColumnType(THiveColumnType.REGULAR);
+                targetColumns.add(tHiveColumn);
+            }
+        }
+        tSink.setColumns(targetColumns);
+
+        setPartitionValues(tSink);
+
+        StorageDescriptor sd = targetTable.getRemoteTable().getSd();
+        THiveBucket bucketInfo = new THiveBucket();
+        bucketInfo.setBucketedBy(sd.getBucketCols());
+        bucketInfo.setBucketCount(sd.getNumBuckets());
+        tSink.setBucketInfo(bucketInfo);
+
+        TFileFormatType formatType = getFileFormatType(sd);
+        tSink.setFileFormat(formatType);
+        setCompressType(tSink, formatType);
+
+        THiveLocationParams locationParams = new THiveLocationParams();
+        String location = sd.getLocation();
+
+        String writeTempPath = createTempPath(location);
+        locationParams.setWritePath(writeTempPath);
+        locationParams.setTargetPath(location);
+        locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
+        tSink.setLocation(locationParams);
+
+        tSink.setHadoopConfig(targetTable.getHadoopProperties());
+
+        if (insertCtx.isPresent()) {
+            HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
+            tSink.setOverwrite(context.isOverwrite());
+        }
+        tDataSink = new TDataSink(getDataSinkType());
+        tDataSink.setHiveTableSink(tSink);
+    }
+
+    private String createTempPath(String location) {
+        String user = ConnectContext.get().getUserIdentity().getUser();
+        return location + "/.doris_staging/" + user + "/" + 
UUID.randomUUID().toString().replace("-", "");
+    }
+
+    private void setCompressType(THiveTableSink tSink, TFileFormatType 
formatType) {
+        String compressType;
+        switch (formatType) {
+            case FORMAT_ORC:
+                compressType = 
targetTable.getRemoteTable().getParameters().get("orc.compress");
+                break;
+            case FORMAT_PARQUET:
+                compressType = 
targetTable.getRemoteTable().getParameters().get("parquet.compression");
+                break;
+            default:
+                compressType = "uncompressed";
+                break;
+        }
+
+        if ("snappy".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK);
+        } else if ("lz4".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.LZ4BLOCK);
+        } else if ("lzo".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.LZO);
+        } else if ("zlib".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.ZLIB);
+        } else if ("zstd".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.ZSTD);
+        } else {
+            tSink.setCompressionType(TFileCompressType.PLAIN);

Review Comment:
   Should throw exception for unsupported compress type



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSink.java:
##########
@@ -54,4 +54,8 @@ public List<Slot> computeOutput() {
                 .map(NamedExpression::toSlot)
                 .collect(ImmutableList.toImmutableList());
     }
+
+    public PhysicalProperties getRequirePhysicalProperties() {
+        return PhysicalProperties.ANY;

Review Comment:
   Double check whether `PhysicalProperties.ANY` is the default value for 
PhysicalSink.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java:
##########
@@ -156,20 +160,40 @@ public void run(ConnectContext ctx, StmtExecutor 
executor) throws Exception {
      */
     private void insertInto(ConnectContext ctx, StmtExecutor executor, 
List<String> tempPartitionNames)
             throws Exception {
-        UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
-        UnboundTableSink<?> copySink = new UnboundTableSink<>(
-                sink.getNameParts(),
-                sink.getColNames(),
-                sink.getHints(),
-                true,
-                tempPartitionNames,
-                sink.isPartialUpdate(),
-                sink.getDMLCommandType(),
-                (LogicalPlan) (sink.child(0)));
-        // for overwrite situation, we disable auto create partition.
-        OlapInsertCommandContext insertCtx = new OlapInsertCommandContext();
-        insertCtx.setAllowAutoPartition(false);
-        InsertIntoTableCommand insertCommand = new 
InsertIntoTableCommand(copySink, labelName, Optional.of(insertCtx));
+        UnboundLogicalSink<?> copySink;
+        InsertCommandContext insertCtx;
+        if (logicalQuery instanceof UnboundTableSink) {
+            UnboundTableSink<?> sink = (UnboundTableSink<?>) logicalQuery;
+            copySink = (UnboundLogicalSink<?>) 
UnboundTableSinkCreator.createUnboundTableSink(
+                    sink.getNameParts(),
+                    sink.getColNames(),
+                    sink.getHints(),
+                    true,
+                    tempPartitionNames,
+                    sink.isPartialUpdate(),
+                    sink.getDMLCommandType(),
+                    (LogicalPlan) (sink.child(0)));
+            // for overwrite situation, we disable auto create partition.
+            insertCtx = new OlapInsertCommandContext();
+            ((OlapInsertCommandContext) 
insertCtx).setAllowAutoPartition(false);
+        } else if (logicalQuery instanceof UnboundHiveTableSink) {
+            UnboundHiveTableSink<?> sink = (UnboundHiveTableSink<?>) 
logicalQuery;
+            copySink = (UnboundLogicalSink<?>) 
UnboundTableSinkCreator.createUnboundTableSink(
+                    sink.getNameParts(),
+                    sink.getColNames(),
+                    sink.getHints(),
+                    false,
+                    sink.getPartitions(),
+                    false,
+                    sink.getDMLCommandType(),
+                    (LogicalPlan) (sink.child(0)));
+            insertCtx = new HiveInsertCommandContext();
+            ((HiveInsertCommandContext) insertCtx).setOverwrite(false);
+        } else {
+            throw new RuntimeException("Current catalog has not supported 
insert overwrite yet.");

Review Comment:
   ```suggestion
               throw new RuntimeException("Current catalog does not support 
insert overwrite.");
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PostgreSQLJdbcHMSCachedClient.java:
##########
@@ -172,15 +176,27 @@ public List<Partition> getPartitions(String dbName, 
String tblName, List<String>
     private List<Partition> getPartitionsByNames(String dbName, String 
tblName, List<String> partitionNames) {
         List<String> partitionNamesWithQuote = 
partitionNames.stream().map(partitionName -> "'" + partitionName + "'")
                 .collect(Collectors.toList());
-        String partitionNamesString = Joiner.on(", 
").join(partitionNamesWithQuote);
-        String sql = String.format("SELECT \"PART_ID\", 
\"PARTITIONS\".\"CREATE_TIME\","
-                        + " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
-                        + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM 
\"PARTITIONS\""
-                        + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = 
\"PARTITIONS\".\"TBL_ID\""
-                        + " join \"DBS\" on \"TBLS\".\"DB_ID\" = 
\"DBS\".\"DB_ID\""
-                        + " WHERE \"DBS\".\"NAME\" = '%s' AND 
\"TBLS\".\"TBL_NAME\"='%s'"
-                        + " AND \"PART_NAME\" in (%s);",
-                dbName, tblName, partitionNamesString);
+        String sql;
+        if (partitionNamesWithQuote.isEmpty()) {
+            sql = String.format("SELECT \"PART_ID\", 
\"PARTITIONS\".\"CREATE_TIME\","
+                            + " \"PARTITIONS\".\"LAST_ACCESS_TIME\","
+                            + " \"PART_NAME\", \"PARTITIONS\".\"SD_ID\" FROM 
\"PARTITIONS\""
+                            + " join \"TBLS\" on \"TBLS\".\"TBL_ID\" = 
\"PARTITIONS\".\"TBL_ID\""
+                            + " join \"DBS\" on \"TBLS\".\"DB_ID\" = 
\"DBS\".\"DB_ID\""
+                            + " WHERE \"DBS\".\"NAME\" = '%s' AND 
\"TBLS\".\"TBL_NAME\"='%s'"
+                            + " AND \"PART_NAME\";",

Review Comment:
   The last is `AND PARTNAME`?
   And why changing this? This is unrelated to your feature



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalHiveTableSink.java:
##########
@@ -100,20 +118,50 @@ public List<? extends Expression> getExpressions() {
 
     @Override
     public Plan withGroupExpression(Optional<GroupExpression> groupExpression) 
{
-        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
-                groupExpression, getLogicalProperties(), child());
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
outputExprs,
+                groupExpression, getLogicalProperties(), child(), 
hivePartitionKeys);
     }
 
     @Override
     public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> 
groupExpression,
                                                  Optional<LogicalProperties> 
logicalProperties, List<Plan> children) {
-        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
-                groupExpression, logicalProperties.get(), children.get(0));
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
outputExprs,
+                groupExpression, logicalProperties.get(), children.get(0), 
hivePartitionKeys);
     }
 
     @Override
     public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties 
physicalProperties, Statistics statistics) {
-        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
partitionIds, outputExprs,
-                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child());
+        return new PhysicalHiveTableSink<>(database, targetTable, cols, 
outputExprs,
+                groupExpression, getLogicalProperties(), physicalProperties, 
statistics, child(), hivePartitionKeys);
+    }
+
+    /**
+     * get output physical properties
+     */
+    @Override
+    public PhysicalProperties getRequirePhysicalProperties() {
+        Set<String> hivePartitionKeys = targetTable.getRemoteTable()
+                .getPartitionKeys().stream()
+                .map(FieldSchema::getName)
+                .collect(Collectors.toSet());
+        if (!hivePartitionKeys.isEmpty()) {
+            List<Integer> columnIdx = new ArrayList<>();
+            List<Column> fullSchema = targetTable.getFullSchema();
+            for (int i = 0; i < fullSchema.size(); i++) {
+                Column column = fullSchema.get(i);
+                if (hivePartitionKeys.contains(column.getName())) {
+                    columnIdx.add(i);
+                }
+            }
+            DistributionSpecTableSinkHashPartitioned shuffleInfo =
+                    (DistributionSpecTableSinkHashPartitioned) 
PhysicalProperties.SINK_HASH_PARTITIONED

Review Comment:
   `PhysicalProperties.SINK_HASH_PARTITIONED` is a singleton,
   can we modify it?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertUtils.java:
##########
@@ -345,13 +347,17 @@ private static Expression castValue(Expression value, 
DataType targetType) {
      * get target table from names.
      */
     public static TableIf getTargetTable(Plan plan, ConnectContext ctx) {
-        if (!(plan instanceof UnboundTableSink)) {
-            throw new AnalysisException("the root of plan should be 
UnboundTableSink"
-                    + " but it is " + plan.getType());
+        if (plan instanceof UnboundTableSink) {

Review Comment:
   What is the difference between this 2 branch?
   We need to try best to avoid `if else` for different type of table



##########
fe/fe-core/src/main/java/org/apache/doris/planner/HiveTableSink.java:
##########
@@ -59,9 +85,132 @@ public DataPartition getOutputPartition() {
         return DataPartition.RANDOM;
     }
 
-    public void init() {
+    public void bindDataSink(List<Column> insertCols, 
Optional<InsertCommandContext> insertCtx) {
+        THiveTableSink tSink = new THiveTableSink();
+        tSink.setDbName(targetTable.getDbName());
+        tSink.setTableName(targetTable.getName());
+        Set<String> partNames = new 
HashSet<>(targetTable.getPartitionColumnNames());
+        Set<String> colNames = targetTable.getColumns()
+                .stream().map(Column::getName)
+                .collect(Collectors.toSet());
+        colNames.removeAll(partNames);
+        List<THiveColumn> targetColumns = new ArrayList<>();
+        for (Column col : insertCols) {
+            if (partNames.contains(col.getName())) {
+                THiveColumn tHiveColumn = new THiveColumn();
+                tHiveColumn.setName(col.getName());
+                tHiveColumn.setDataType(col.getType().toThrift());
+                tHiveColumn.setColumnType(THiveColumnType.PARTITION_KEY);
+                targetColumns.add(tHiveColumn);
+            } else if (colNames.contains(col.getName())) {
+                THiveColumn tHiveColumn = new THiveColumn();
+                tHiveColumn.setName(col.getName());
+                tHiveColumn.setDataType(col.getType().toThrift());
+                tHiveColumn.setColumnType(THiveColumnType.REGULAR);
+                targetColumns.add(tHiveColumn);
+            }
+        }
+        tSink.setColumns(targetColumns);
+
+        setPartitionValues(tSink);
+
+        StorageDescriptor sd = targetTable.getRemoteTable().getSd();
+        THiveBucket bucketInfo = new THiveBucket();
+        bucketInfo.setBucketedBy(sd.getBucketCols());
+        bucketInfo.setBucketCount(sd.getNumBuckets());
+        tSink.setBucketInfo(bucketInfo);
+
+        TFileFormatType formatType = getFileFormatType(sd);
+        tSink.setFileFormat(formatType);
+        setCompressType(tSink, formatType);
+
+        THiveLocationParams locationParams = new THiveLocationParams();
+        String location = sd.getLocation();
+
+        String writeTempPath = createTempPath(location);
+        locationParams.setWritePath(writeTempPath);
+        locationParams.setTargetPath(location);
+        locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
+        tSink.setLocation(locationParams);
+
+        tSink.setHadoopConfig(targetTable.getHadoopProperties());
+
+        if (insertCtx.isPresent()) {
+            HiveInsertCommandContext context = (HiveInsertCommandContext) 
insertCtx.get();
+            tSink.setOverwrite(context.isOverwrite());
+        }
+        tDataSink = new TDataSink(getDataSinkType());
+        tDataSink.setHiveTableSink(tSink);
+    }
+
+    private String createTempPath(String location) {
+        String user = ConnectContext.get().getUserIdentity().getUser();
+        return location + "/.doris_staging/" + user + "/" + 
UUID.randomUUID().toString().replace("-", "");
+    }
+
+    private void setCompressType(THiveTableSink tSink, TFileFormatType 
formatType) {
+        String compressType;
+        switch (formatType) {
+            case FORMAT_ORC:
+                compressType = 
targetTable.getRemoteTable().getParameters().get("orc.compress");
+                break;
+            case FORMAT_PARQUET:
+                compressType = 
targetTable.getRemoteTable().getParameters().get("parquet.compression");
+                break;
+            default:
+                compressType = "uncompressed";
+                break;
+        }
+
+        if ("snappy".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.SNAPPYBLOCK);
+        } else if ("lz4".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.LZ4BLOCK);
+        } else if ("lzo".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.LZO);
+        } else if ("zlib".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.ZLIB);
+        } else if ("zstd".equalsIgnoreCase(compressType)) {
+            tSink.setCompressionType(TFileCompressType.ZSTD);
+        } else {
+            tSink.setCompressionType(TFileCompressType.PLAIN);
+        }
+    }
+
+    private void setPartitionValues(THiveTableSink tSink) {
+        List<THivePartition> partitions = new ArrayList<>();
+        List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions =
+                ((HMSExternalCatalog) targetTable.getCatalog())
+                        .getClient().listPartitions(targetTable.getDbName(), 
targetTable.getName());
+        for (org.apache.hadoop.hive.metastore.api.Partition partition : 
hivePartitions) {
+            THivePartition hivePartition = new THivePartition();
+            StorageDescriptor sd = partition.getSd();
+            hivePartition.setFileFormat(getFileFormatType(sd));
+
+            hivePartition.setValues(partition.getValues());
+            THiveLocationParams locationParams = new THiveLocationParams();
+            String location = sd.getLocation();
+            // pass the same of write path and target path to partition
+            locationParams.setWritePath(location);
+            locationParams.setTargetPath(location);
+            
locationParams.setFileType(LocationPath.getTFileTypeForBE(location));
+            hivePartition.setLocation(locationParams);
+            partitions.add(hivePartition);
+        }
+        tSink.setPartitions(partitions);
+    }
+
+    private TFileFormatType getFileFormatType(StorageDescriptor sd) {
+        TFileFormatType fileFormatType;
+        if (sd.getInputFormat().toLowerCase().contains("orc")) {
+            fileFormatType = TFileFormatType.FORMAT_ORC;
+        } else {

Review Comment:
   else if parquet xxxx
   else throw exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to