This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 2e64491ee38 [branch-2.1](insert-overwrite) Support create partition 
for auto partition table when insert overwrite (#38628) (#42644)
2e64491ee38 is described below

commit 2e64491ee38f4248a11e9b6bd29cd5b37523b913
Author: zclllhhjj <zhaochan...@selectdb.com>
AuthorDate: Wed Nov 13 11:16:00 2024 +0800

    [branch-2.1](insert-overwrite) Support create partition for auto partition 
table when insert overwrite (#38628) (#42644)
    
    pick https://github.com/apache/doris/pull/38628
---
 be/src/exec/tablet_info.cpp                        |   1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   2 +-
 be/src/vec/sink/vrow_distribution.cpp              | 177 ++++++++++++++-------
 be/src/vec/sink/vrow_distribution.h                |   9 +-
 be/src/vec/sink/writer/vtablet_writer.cpp          |   2 +-
 be/src/vec/sink/writer/vtablet_writer_v2.cpp       |   2 +-
 .../apache/doris/analysis/NativeInsertStmt.java    |   6 -
 .../doris/nereids/parser/LogicalPlanBuilder.java   |   4 +
 .../insert/InsertOverwriteTableCommand.java        |  33 ++--
 .../java/org/apache/doris/qe/SessionVariable.java  |  20 ++-
 .../java/org/apache/doris/qe/StmtExecutor.java     |  28 +---
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 .../test_iot_overwrite_and_create.out              |  24 +++
 .../test_iot_overwrite_and_create_many.out         |  15 ++
 .../test_iot_overwrite_and_create.groovy           |  71 +++++++++
 .../test_iot_overwrite_and_create_many.groovy      |  64 ++++++++
 16 files changed, 356 insertions(+), 104 deletions(-)

diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 3c934ad0831..53cabb5305f 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -729,6 +729,7 @@ Status VOlapTablePartitionParam::replace_partitions(
 
         // add new partitions with new id.
         _partitions.emplace_back(part);
+        VLOG_NOTICE << "params add new partition " << part->id;
 
         // replace items in _partition_maps
         if (_is_in_partition) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 9958a0b6fc1..b26c69ad560 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -324,7 +324,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() {
         vectorized::Block tmp_block =
                 _row_distribution._batching_block->to_block(); // Borrow out, 
for lval ref
         auto& p = _parent->cast<ExchangeSinkOperatorX>();
-        // these order is only.
+        // these order is unique.
         //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
diff --git a/be/src/vec/sink/vrow_distribution.cpp 
b/be/src/vec/sink/vrow_distribution.cpp
index 741148b66f5..025e13edff0 100644
--- a/be/src/vec/sink/vrow_distribution.cpp
+++ b/be/src/vec/sink/vrow_distribution.cpp
@@ -23,7 +23,7 @@
 
 #include <cstdint>
 #include <memory>
-#include <sstream>
+#include <string>
 
 #include "common/status.h"
 #include "runtime/client_cache.h"
@@ -110,6 +110,10 @@ Status VRowDistribution::automatic_create_partition() {
     if (result.status.status_code == TStatusCode::OK) {
         // add new created partitions
         RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
+        for (const auto& part : result.partitions) {
+            _new_partition_ids.insert(part.id);
+            VLOG_TRACE << "record new id: " << part.id;
+        }
         RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
     }
 
@@ -128,7 +132,7 @@ static TCreatePartitionResult 
cast_as_create_result(TReplacePartitionResult& arg
 
 // use _partitions and replace them
 Status VRowDistribution::_replace_overwriting_partition() {
-    SCOPED_TIMER(_add_partition_request_timer);
+    SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
     TReplacePartitionRequest request;
     TReplacePartitionResult result;
     request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
@@ -138,16 +142,20 @@ Status VRowDistribution::_replace_overwriting_partition() 
{
     // only request for partitions not recorded for replacement
     std::set<int64_t> id_deduper;
     for (const auto* part : _partitions) {
-        if (part == nullptr) [[unlikely]] {
-            return Status::InternalError(
-                    "Cannot found origin partitions in auto detect 
overwriting, stop processing");
-        }
-        if (_new_partition_ids.contains(part->id)) {
-            // this is a new partition. dont replace again.
-        } else {
-            // request for replacement
-            id_deduper.insert(part->id);
-        }
+        if (part != nullptr) {
+            if (_new_partition_ids.contains(part->id)) {
+                // this is a new partition. dont replace again.
+                VLOG_TRACE << "skip new partition: " << part->id;
+            } else {
+                // request for replacement
+                id_deduper.insert(part->id);
+            }
+        } else if (_missing_map.empty()) {
+            // no origin partition. and not allow to create.
+            return Status::InvalidArgument(
+                    "Cannot found origin partitions in auto detect 
overwriting, stop "
+                    "processing");
+        } // else: part is null and _missing_map is not empty. dealed outside 
using auto-partition way. nothing to do here.
     }
     if (id_deduper.empty()) {
         return Status::OK(); // no need to request
@@ -172,6 +180,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
         // record new partitions
         for (const auto& part : result.partitions) {
             _new_partition_ids.insert(part.id);
+            VLOG_TRACE << "record new id: " << part.id;
         }
         // replace data in _partitions
         RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, 
result.partitions));
@@ -294,6 +303,52 @@ Status 
VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
     return Status::OK();
 }
 
+Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
+                                           const std::vector<uint16_t>& 
partition_cols_idx,
+                                           int64_t& rows_stat_val) {
+    // for missing partition keys, calc the missing partition and save in 
_partitions_need_create
+    auto [part_ctxs, part_exprs] = _get_partition_function();
+    auto part_col_num = part_exprs.size();
+    // the two vectors are in column-first-order
+    std::vector<std::vector<std::string>> col_strs;
+    std::vector<const NullMap*> col_null_maps;
+    col_strs.resize(part_col_num);
+    col_null_maps.reserve(part_col_num);
+
+    for (int i = 0; i < part_col_num; ++i) {
+        auto return_type = part_exprs[i]->data_type();
+        // expose the data column. the return type would be nullable
+        const auto& [range_left_col, col_const] =
+                
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
+        if (range_left_col->is_nullable()) {
+            col_null_maps.push_back(&(
+                    assert_cast<const 
ColumnNullable*>(range_left_col.get())->get_null_map_data()));
+        } else {
+            col_null_maps.push_back(nullptr);
+        }
+        for (auto row : _missing_map) {
+            col_strs[i].push_back(
+                    return_type->to_string(*range_left_col, 
index_check_const(row, col_const)));
+        }
+    }
+
+    // calc the end value and save them. in the end of sending, we will create 
partitions for them and deal them.
+    RETURN_IF_ERROR(
+            _save_missing_values(col_strs, part_col_num, block, _missing_map, 
col_null_maps));
+
+    size_t new_bt_rows = _batching_block->rows();
+    size_t new_bt_bytes = _batching_block->bytes();
+    rows_stat_val -= new_bt_rows - _batching_rows;
+    _state->update_num_rows_load_total(_batching_rows - new_bt_rows);
+    _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
+    DorisMetrics::instance()->load_rows->increment(_batching_rows - 
new_bt_rows);
+    DorisMetrics::instance()->load_bytes->increment(_batching_bytes - 
new_bt_bytes);
+    _batching_rows = new_bt_rows;
+    _batching_bytes = new_bt_bytes;
+
+    return Status::OK();
+}
+
 Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
         vectorized::Block* block, const std::vector<uint16_t>& 
partition_cols_idx,
         bool has_filtered_rows, std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
@@ -319,63 +374,64 @@ Status 
VRowDistribution::_generate_rows_distribution_for_auto_partition(
     RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));
 
     if (!_missing_map.empty()) {
-        // for missing partition keys, calc the missing partition and save in 
_partitions_need_create
-        auto [part_ctxs, part_exprs] = _get_partition_function();
-        auto part_col_num = part_exprs.size();
-        // the two vectors are in column-first-order
-        std::vector<std::vector<std::string>> col_strs;
-        std::vector<const NullMap*> col_null_maps;
-        col_strs.resize(part_col_num);
-        col_null_maps.reserve(part_col_num);
-
-        for (int i = 0; i < part_col_num; ++i) {
-            auto return_type = part_exprs[i]->data_type();
-            // expose the data column. the return type would be nullable
-            const auto& [range_left_col, col_const] =
-                    
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
-            if (range_left_col->is_nullable()) {
-                col_null_maps.push_back(&(assert_cast<const 
ColumnNullable*>(range_left_col.get())
-                                                  ->get_null_map_data()));
-            } else {
-                col_null_maps.push_back(nullptr);
-            }
-            for (auto row : _missing_map) {
-                col_strs[i].push_back(
-                        return_type->to_string(*range_left_col, 
index_check_const(row, col_const)));
-            }
-        }
-
-        // calc the end value and save them. in the end of sending, we will 
create partitions for them and deal them.
-        RETURN_IF_ERROR(
-                _save_missing_values(col_strs, part_col_num, block, 
_missing_map, col_null_maps));
-
-        size_t new_bt_rows = _batching_block->rows();
-        size_t new_bt_bytes = _batching_block->bytes();
-        rows_stat_val -= new_bt_rows - _batching_rows;
-        _state->update_num_rows_load_total(_batching_rows - new_bt_rows);
-        _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
-        DorisMetrics::instance()->load_rows->increment(_batching_rows - 
new_bt_rows);
-        DorisMetrics::instance()->load_bytes->increment(_batching_bytes - 
new_bt_bytes);
-        _batching_rows = new_bt_rows;
-        _batching_bytes = new_bt_bytes;
+        RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, 
rows_stat_val));
     }
     return Status::OK();
 }
 
 Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
-        vectorized::Block* block, bool has_filtered_rows,
-        std::vector<RowPartTabletIds>& row_part_tablet_ids) {
+        vectorized::Block* block, const std::vector<uint16_t>& 
partition_cols_idx,
+        bool has_filtered_rows, std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+        int64_t& rows_stat_val) {
     auto num_rows = block->rows();
 
+    // for non-auto-partition situation, goes into two 'else' branch. just 
find the origin partitions, replace them by rpc,
+    //  and find the new partitions to use.
+    // for auto-partition's, find and save origins in _partitions and replace 
them. at meanwhile save the missing values for auto
+    //  partition. then we find partition again to get replaced partitions in 
_partitions. this time _missing_map is ignored cuz
+    //  we already saved missing values.
     bool stop_processing = false;
-    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                 _tablet_indexes, 
stop_processing, _skip));
+    if (_vpartition->is_auto_partition() &&
+        _state->query_options().enable_auto_create_when_overwrite) {
+        // allow auto create partition for missing rows.
+        std::vector<uint16_t> partition_keys = 
_vpartition->get_partition_keys();
+        auto partition_col = block->get_by_position(partition_keys[0]);
+        _missing_map.clear();
+        _missing_map.reserve(partition_col.column->size());
+
+        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                     _tablet_indexes, 
stop_processing, _skip,
+                                                     &_missing_map));
+
+        // allow and really need to create during auto-detect-overwriting.
+        if (!_missing_map.empty()) {
+            RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, 
rows_stat_val));
+        }
+    } else {
+        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                     _tablet_indexes, 
stop_processing, _skip));
+    }
     RETURN_IF_ERROR(_replace_overwriting_partition());
 
     // regenerate locations for new partitions & tablets
     _reset_find_tablets(num_rows);
-    RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
-                                                 _tablet_indexes, 
stop_processing, _skip));
+    if (_vpartition->is_auto_partition() &&
+        _state->query_options().enable_auto_create_when_overwrite) {
+        // here _missing_map is just a placeholder
+        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                     _tablet_indexes, 
stop_processing, _skip,
+                                                     &_missing_map));
+        if (VLOG_TRACE_IS_ON) {
+            std::string tmp;
+            for (auto v : _missing_map) {
+                tmp += std::to_string(v).append(", ");
+            }
+            VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
+        }
+    } else {
+        RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, 
_partitions,
+                                                     _tablet_indexes, 
stop_processing, _skip));
+    }
     if (has_filtered_rows) {
         for (int i = 0; i < num_rows; i++) {
             _skip[i] = _skip[i] || _block_convertor->filter_map()[i];
@@ -446,10 +502,11 @@ Status VRowDistribution::generate_rows_distribution(
     }
 
     Status st = Status::OK();
-    if (_vpartition->is_auto_detect_overwrite()) {
+    if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
         // when overwrite, no auto create partition allowed.
-        st = _generate_rows_distribution_for_auto_overwrite(block.get(), 
has_filtered_rows,
-                                                            
row_part_tablet_ids);
+        st = _generate_rows_distribution_for_auto_overwrite(block.get(), 
partition_cols_idx,
+                                                            has_filtered_rows, 
row_part_tablet_ids,
+                                                            rows_stat_val);
     } else if (_vpartition->is_auto_partition() && !_deal_batched) {
         st = _generate_rows_distribution_for_auto_partition(block.get(), 
partition_cols_idx,
                                                             has_filtered_rows, 
row_part_tablet_ids,
diff --git a/be/src/vec/sink/vrow_distribution.h 
b/be/src/vec/sink/vrow_distribution.h
index fffe0e3f7f1..248982c0202 100644
--- a/be/src/vec/sink/vrow_distribution.h
+++ b/be/src/vec/sink/vrow_distribution.h
@@ -162,14 +162,19 @@ private:
             vectorized::Block* block, const std::vector<uint16_t>& 
partition_col_idx,
             bool has_filtered_rows, std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
             int64_t& rows_stat_val);
+    // the whole process to deal missing rows. will call _save_missing_values
+    Status _deal_missing_map(vectorized::Block* block,
+                             const std::vector<uint16_t>& partition_cols_idx,
+                             int64_t& rows_stat_val);
 
     Status _generate_rows_distribution_for_non_auto_partition(
             vectorized::Block* block, bool has_filtered_rows,
             std::vector<RowPartTabletIds>& row_part_tablet_ids);
 
     Status _generate_rows_distribution_for_auto_overwrite(
-            vectorized::Block* block, bool has_filtered_rows,
-            std::vector<RowPartTabletIds>& row_part_tablet_ids);
+            vectorized::Block* block, const std::vector<uint16_t>& 
partition_cols_idx,
+            bool has_filtered_rows, std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
+            int64_t& rows_stat_val);
     Status _replace_overwriting_partition();
 
     void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& 
row_part_tablet_ids,
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 29b05217529..6745c73d284 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -1380,7 +1380,7 @@ Status VTabletWriter::_send_new_partition_batch() {
 
         Block tmp_block = _row_distribution._batching_block->to_block(); // 
Borrow out, for lval ref
 
-        // these order is only.
+        // these order is unique.
         //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c693e20c3a8..dbc85147fe7 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -529,7 +529,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {
 
         Block tmp_block = _row_distribution._batching_block->to_block(); // 
Borrow out, for lval ref
 
-        // these order is only.
+        // these order is unique.
         //  1. clear batching stats(and flag goes true) so that we won't make 
a new batching process in dealing batched block.
         //  2. deal batched block
         //  3. now reuse the column of lval block. cuz write doesn't real 
adjust it. it generate a new block from that.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index a21cfb45553..01d5e6b87d3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -165,7 +165,6 @@ public class NativeInsertStmt extends InsertStmt {
 
     boolean hasEmptyTargetColumns = false;
     private boolean allowAutoPartition = true;
-    private boolean withAutoDetectOverwrite = false;
 
     enum InsertType {
         NATIVE_INSERT("insert_"),
@@ -331,11 +330,6 @@ public class NativeInsertStmt extends InsertStmt {
         return isTransactionBegin;
     }
 
-    public NativeInsertStmt withAutoDetectOverwrite() {
-        this.withAutoDetectOverwrite = true;
-        return this;
-    }
-
     protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
         super.analyze(analyzer);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 4114200d2fa..bcd5b641a8a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -593,6 +593,10 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         LogicalPlan plan = visitQuery(ctx.query());
         // partitionSpec may be NULL. means auto detect partition. only 
available when IOT
         Pair<Boolean, List<String>> partitionSpec = 
visitPartitionSpec(ctx.partitionSpec());
+        // partitionSpec.second :
+        // null - auto detect
+        // zero - whole table
+        // others - specific partitions
         boolean isAutoDetect = partitionSpec.second == null;
         LogicalSink<?> sink = 
UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
                 tableName.build(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
index b0faa6ba508..51f87a25889 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java
@@ -151,6 +151,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
         PhysicalTableSink<?> physicalTableSink = ((PhysicalTableSink<?>) 
plan.get());
         TableIf targetTable = physicalTableSink.getTargetTable();
         List<String> partitionNames;
+        boolean wholeTable = false;
         if (physicalTableSink instanceof PhysicalOlapTableSink) {
             InternalDatabaseUtil
                     .checkDatabase(((OlapTable) 
targetTable).getQualifiedDbName(), ConnectContext.get());
@@ -165,7 +166,10 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
             }
             ConnectContext.get().setSkipAuth(true);
             partitionNames = ((UnboundTableSink<?>) 
logicalQuery).getPartitions();
+            // If not specific partition to overwrite, means it's a command to 
overwrite the table.
+            // not we execute as overwrite every partitions.
             if (CollectionUtils.isEmpty(partitionNames)) {
+                wholeTable = true;
                 partitionNames = 
Lists.newArrayList(targetTable.getPartitionNames());
             }
         } else {
@@ -183,9 +187,10 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                 // When inserting, BE will call to replace partition by 
FrontendService. FE will register new temp
                 // partitions and return. for transactional, the replacement 
will really occur when insert successed,
                 // i.e. `insertInto` finished. then we call taskGroupSuccess 
to make replacement.
-                insertInto(ctx, executor, taskId);
+                insertIntoAutoDetect(ctx, executor, taskId);
                 insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) 
targetTable);
             } else {
+                // it's overwrite table(as all partitions) or specific 
partition(s)
                 List<String> tempPartitionNames = 
InsertOverwriteUtil.generateTempPartitionNames(partitionNames);
                 if (isCancelled.get()) {
                     LOG.info("insert overwrite is cancelled before 
registerTask, queryId: {}",
@@ -207,7 +212,7 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                     insertOverwriteManager.taskFail(taskId);
                     return;
                 }
-                insertInto(ctx, executor, tempPartitionNames);
+                insertIntoPartitions(ctx, executor, tempPartitionNames, 
wholeTable);
                 if (isCancelled.get()) {
                     LOG.info("insert overwrite is cancelled before 
replacePartition, queryId: {}",
                             ctx.getQueryIdentifier());
@@ -280,13 +285,15 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
     }
 
     /**
-     * insert into select. for sepecified temp partitions
+     * insert into select. for sepecified temp partitions or all 
partitions(table).
      *
-     * @param ctx ctx
-     * @param executor executor
+     * @param ctx                ctx
+     * @param executor           executor
      * @param tempPartitionNames tempPartitionNames
+     * @param wholeTable         overwrite target is the whole table. not one 
by one by partitions(...)
      */
-    private void insertInto(ConnectContext ctx, StmtExecutor executor, 
List<String> tempPartitionNames)
+    private void insertIntoPartitions(ConnectContext ctx, StmtExecutor 
executor, List<String> tempPartitionNames,
+            boolean wholeTable)
             throws Exception {
         // copy sink tot replace by tempPartitions
         UnboundLogicalSink<?> copySink;
@@ -302,9 +309,10 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
                     sink.isPartialUpdate(),
                     sink.getDMLCommandType(),
                     (LogicalPlan) (sink.child(0)));
-            // 1. for overwrite situation, we disable auto create partition.
+            // 1. when overwrite table, allow auto partition or not is 
controlled by session variable.
             // 2. we save and pass overwrite auto detect by insertCtx
-            insertCtx = new OlapInsertCommandContext(false);
+            boolean allowAutoPartition = wholeTable && 
ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite();
+            insertCtx = new OlapInsertCommandContext(allowAutoPartition);
         } else if (logicalQuery instanceof UnboundHiveTableSink) {
             UnboundHiveTableSink<?> sink = (UnboundHiveTableSink<?>) 
logicalQuery;
             copySink = (UnboundLogicalSink<?>) 
UnboundTableSinkCreator.createUnboundTableSink(
@@ -343,12 +351,13 @@ public class InsertOverwriteTableCommand extends Command 
implements ForwardWithS
      * @param ctx ctx
      * @param executor executor
      */
-    private void insertInto(ConnectContext ctx, StmtExecutor executor, long 
groupId) throws Exception {
-        // 1. for overwrite situation, we disable auto create partition.
-        // 2. we save and pass overwrite auto-detected by insertCtx
+    private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor 
executor, long groupId) throws Exception {
         InsertCommandContext insertCtx;
         if (logicalQuery instanceof UnboundTableSink) {
-            insertCtx = new OlapInsertCommandContext(false,
+            // 1. when overwrite auto-detect, allow auto partition or not is 
controlled by session variable.
+            // 2. we save and pass overwrite auto detect by insertCtx
+            boolean allowAutoPartition = 
ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite();
+            insertCtx = new OlapInsertCommandContext(allowAutoPartition,
                     ((UnboundTableSink<?>) 
logicalQuery).isAutoDetectPartition(), groupId);
         } else if (logicalQuery instanceof UnboundHiveTableSink) {
             insertCtx = new HiveInsertCommandContext();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index aa4d06c93dc..5368112bf1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -646,6 +646,12 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_COOLDOWN_REPLICA_AFFINITY =
             "enable_cooldown_replica_affinity";
+    /**
+     * Inserting overwrite for auto partition table allows creating partition 
for
+     * datas which cannot find partition to overwrite.
+     */
+    public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = 
"enable_auto_create_when_overwrite";
+
     /**
      * If set false, user couldn't submit analyze SQL and FE won't allocate 
any related resources.
      */
@@ -2104,7 +2110,6 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public boolean enableFallbackOnMissingInvertedIndex = true;
 
-
     @VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = {
         "in条件value数量大于这个threshold后将不会走fast_execute",
         "When the number of values in the IN condition exceeds this threshold,"
@@ -2135,6 +2140,14 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public boolean requireSequenceInInsert = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_AUTO_CREATE_WHEN_OVERWRITE, description 
= {
+        "开启后对自动分区表的 insert overwrite 操作会对没有找到分区的插入数据按自动分区规则创建分区,默认关闭",
+        "The insert overwrite operation on an auto-partitioned table will 
create partitions for inserted data"
+                + " for which no partition is found according to the 
auto-partitioning rules, which is turned off"
+                + " by default."
+    })
+    public boolean enableAutoCreateWhenOverwrite = false;
+
     @VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, needForward = 
true, description = {
             "跳过检查 transactional hive 版本文件 '_orc_acid_version.'",
             "Skip checking transactional hive version file 
'_orc_acid_version.'"
@@ -3670,6 +3683,7 @@ public class SessionVariable implements Serializable, 
Writable {
         
tResult.setEnableAdaptivePipelineTaskSerialReadOnLimit(enableAdaptivePipelineTaskSerialReadOnLimit);
         
tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit);
         tResult.setInListValueCountThreshold(inListValueCountThreshold);
+        
tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite);
         return tResult;
     }
 
@@ -4291,6 +4305,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return this.maxMsgSizeOfResultReceiver;
     }
 
+    public boolean isEnableAutoCreateWhenOverwrite() {
+        return this.enableAutoCreateWhenOverwrite;
+    }
+
     public TSerdeDialect getSerdeDialect() {
         switch (serdeDialect) {
             case "doris":
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 831696d9ebb..b39af6efeb1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -2932,7 +2932,7 @@ public class StmtExecutor {
         }
     }
 
-    private void handleIotStmt() {
+    private void handleIotStmt() throws AnalysisException {
         ConnectContext.get().setSkipAuth(true);
         try {
             InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) 
this.parsedStmt;
@@ -2974,9 +2974,11 @@ public class StmtExecutor {
             return;
         }
         // after success create table insert data
+        // when overwrite table, allow auto partition or not is controlled by 
session variable.
+        boolean allowAutoPartition = 
context.getSessionVariable().isEnableAutoCreateWhenOverwrite();
         try {
             parsedStmt = new NativeInsertStmt(tmpTableName, null, new 
LabelName(iotStmt.getDb(), iotStmt.getLabel()),
-                    iotStmt.getQueryStmt(), iotStmt.getHints(), 
iotStmt.getCols(), true);
+                    iotStmt.getQueryStmt(), iotStmt.getHints(), 
iotStmt.getCols(), allowAutoPartition);
             parsedStmt.setUserInfo(context.getCurrentUserIdentity());
             execute();
             if (MysqlStateType.ERR.equals(context.getState().getStateType())) {
@@ -3046,6 +3048,7 @@ public class StmtExecutor {
             return;
         }
         // after success add tmp partitions
+        // when overwrite partition, auto creating is always disallowed.
         try {
             parsedStmt = new NativeInsertStmt(targetTableName, new 
PartitionNames(true, tempPartitionName),
                     new LabelName(iotStmt.getDb(), iotStmt.getLabel()), 
iotStmt.getQueryStmt(),
@@ -3088,24 +3091,9 @@ public class StmtExecutor {
         }
     }
 
-    /*
-     * TODO: support insert overwrite auto detect partition in legacy planner
-     */
-    private void handleAutoOverwritePartition(InsertOverwriteTableStmt 
iotStmt) {
-        // TODO:
-        TableName targetTableName = new TableName(null, iotStmt.getDb(), 
iotStmt.getTbl());
-        try {
-            parsedStmt = new NativeInsertStmt(targetTableName, null, new 
LabelName(iotStmt.getDb(), iotStmt.getLabel()),
-                    iotStmt.getQueryStmt(), iotStmt.getHints(), 
iotStmt.getCols(), true).withAutoDetectOverwrite();
-            parsedStmt.setUserInfo(context.getCurrentUserIdentity());
-            execute();
-        } catch (Exception e) {
-            LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e);
-            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, 
"Unexpected exception: " + e.getMessage());
-            handleIotRollback(targetTableName);
-            return;
-        }
-
+    private void handleAutoOverwritePartition(InsertOverwriteTableStmt 
iotStmt) throws AnalysisException {
+        throw new AnalysisException(
+                "insert overwrite auto detect is not support in legacy 
planner. use nereids instead");
     }
 
     private void handleIotRollback(TableName table) {
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index aada77ba258..064fb3d3de4 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -332,6 +332,8 @@ struct TQueryOptions {
   132: optional i32 parallel_prepare_threshold = 0;
   133: optional i32 partition_topn_max_partitions = 1024;
   134: optional i32 partition_topn_pre_partition_rows = 1000;
+
+  137: optional bool enable_auto_create_when_overwrite = false;
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false
 }
diff --git 
a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out 
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out
new file mode 100644
index 00000000000..594c0cfabde
--- /dev/null
+++ b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out
@@ -0,0 +1,24 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !origin --
+1234567
+Beijing
+Shanghai
+list
+xxx
+
+-- !0 --
+SHANGHAI
+zzz
+
+-- !1 --
+zzz2
+
+-- !2 --
+1234567
+BEIJING
+Shanghai
+abcd
+list
+xxx
+zzz2
+
diff --git 
a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
 
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
new file mode 100644
index 00000000000..b52a4ecbc1a
--- /dev/null
+++ 
b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out
@@ -0,0 +1,15 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !sql1 --
+1234567        1
+Beijing        20000
+Shanghai       20000
+list   1
+xxx    1
+zzz    20000
+
+-- !sql2 --
+Beijing        20000
+Shanghai       20000
+yyy    20000
+zzz    20000
+
diff --git 
a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
 
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
new file mode 100644
index 00000000000..4d0b667dd44
--- /dev/null
+++ 
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one  
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information       
+// regarding copyright ownership.  The ASF licenses this file  
+// to you under the Apache License, Version 2.0 (the           
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iot_overwrite_and_create") {
+    sql "set enable_auto_create_when_overwrite = true;"
+
+    sql " drop table if exists auto_list; "
+    sql """
+        create table auto_list(
+            k0 varchar null
+        )
+        auto partition by list (k0)
+        (
+            PARTITION p1 values in (("Beijing"), ("BEIJING")),
+            PARTITION p2 values in (("Shanghai"), ("SHANGHAI")),
+            PARTITION p3 values in (("xxx"), ("XXX")),
+            PARTITION p4 values in (("list"), ("LIST")),
+            PARTITION p5 values in (("1234567"), ("7654321"))
+        )
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 1
+        properties("replication_num" = "1");
+    """
+    sql """ insert into auto_list values 
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+    qt_origin "select * from auto_list order by k0;"
+
+    sql """insert overwrite table auto_list values ("SHANGHAI"), ("zzz");"""
+    qt_0 "select * from auto_list order by k0;"
+    sql """insert overwrite table auto_list values ("zzz2");"""
+    qt_1 "select * from auto_list order by k0;"
+
+    test{
+        sql """insert overwrite table auto_list partition(p1, p2) values 
("zzz");"""
+        exception "Insert has filtered data in strict mode."
+    }
+    test{
+        sql """insert overwrite table auto_list partition(p3) values 
("zzz3");"""
+        exception "Insert has filtered data in strict mode."
+    }
+
+    sql """ insert into auto_list values 
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+    sql """insert overwrite table auto_list partition(*) values ("abcd"), 
("BEIJING");"""
+    qt_2 "select * from auto_list order by k0;"
+
+    sql "set enable_auto_create_when_overwrite = false;"
+    test{
+        sql """insert overwrite table auto_list values ("zzz3");"""
+        exception "Insert has filtered data in strict mode."
+    }
+    test{
+        sql """insert overwrite table auto_list partition(p1, p2) values 
("zzz");"""
+        exception "Insert has filtered data in strict mode."
+    }
+    test{
+        sql """insert overwrite table auto_list partition(*) values 
("zzz3");"""
+        exception "Cannot found origin partitions in auto detect overwriting"
+    }
+}
diff --git 
a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
 
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
new file mode 100644
index 00000000000..dcade3ce211
--- /dev/null
+++ 
b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one  
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information       
+// regarding copyright ownership.  The ASF licenses this file  
+// to you under the Apache License, Version 2.0 (the           
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iot_overwrite_and_create_many") {
+    sql "set enable_auto_create_when_overwrite = true;"
+
+    sql " drop table if exists target; "
+    sql """
+        create table target(
+            k0 varchar null
+        )
+        auto partition by list (k0)
+        (
+            PARTITION p1 values in (("Beijing"), ("BEIJING")),
+            PARTITION p2 values in (("Shanghai"), ("SHANGHAI")),
+            PARTITION p3 values in (("xxx"), ("XXX")),
+            PARTITION p4 values in (("list"), ("LIST")),
+            PARTITION p5 values in (("1234567"), ("7654321"))
+        )
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 2
+        properties("replication_num" = "1");
+    """
+    sql """ insert into target values 
("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """
+
+    sql " drop table if exists source; "
+    sql """
+        create table source(
+            k0 varchar null
+        )
+        DISTRIBUTED BY HASH(`k0`) BUCKETS 10
+        properties("replication_num" = "1");
+    """
+
+    sql """ insert into source select "Beijing" from numbers("number" = 
"20000"); """
+    sql """ insert into source select "Shanghai" from numbers("number" = 
"20000"); """
+    sql """ insert into source select "zzz" from numbers("number"= "20000"); 
"""
+    def result
+    result = sql " show partitions from target; "
+    logger.info("origin: ${result}")
+
+    sql " insert overwrite table target partition(*) select * from source; "
+    result = sql " show partitions from target; "
+    logger.info("changed: ${result}")
+
+    qt_sql1 " select k0, count(k0) from target group by k0 order by k0; "
+
+    sql """ insert into source select "yyy" from numbers("number" = "20000"); 
"""
+    sql " insert overwrite table target select * from source; "
+    qt_sql2 " select k0, count(k0) from target group by k0 order by k0; "
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to