This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d8d9f0a0de4 [Feature-WIP](iceberg-writer) Implements iceberg partition 
transform. (#36289)
d8d9f0a0de4 is described below

commit d8d9f0a0de486eb29b1f31f9b9fdb35bf54cc8af
Author: kang <35803862+ghkan...@users.noreply.github.com>
AuthorDate: Sat Jun 22 20:54:15 2024 +0800

    [Feature-WIP](iceberg-writer) Implements iceberg partition transform. 
(#36289)
    
    #31442
    
    Added iceberg operator function to support direct entry into the lake by
    doris
    1. Support insert into  data to iceberg by appending  hdfs files
    2. Implement iceberg partition routing through partitionTransform
    2.1) Serialize spec and schema data into json on the fe side and then
    deserialize on the be side to get the schema and partition information
    of iceberg table
    2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other
    types of partition strategies through partitionTransform and template
    class
    3. Transaction management through IcebergTransaction
    3.1) After the be side file is written, report CommitData data to fe
    according to the partition granularity
    3.2) After receiving CommitData data, fe submits metadata to iceberg in
    IcebergTransaction
    
    ### Future work
    - Add unit test for partition transform function.
    - Implement partition transform function with exchange sink turned on.
    - The partition transform function omits the processing of bigint type.
    
    ---------
    
    Co-authored-by: lik40 <li...@chinatelecom.cn>
    Co-authored-by: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
---
 be/src/util/bit_util.h                             |   22 +
 .../sink/writer/iceberg/partition_transformers.cpp |  168 ++-
 .../sink/writer/iceberg/partition_transformers.h   | 1275 +++++++++++++++++++-
 .../sink/writer/iceberg/viceberg_table_writer.cpp  |   22 +-
 .../apache/doris/common/info/SimpleTableInfo.java  |   66 +
 .../datasource/iceberg/IcebergMetadataCache.java   |   19 +-
 .../datasource/iceberg/IcebergMetadataOps.java     |    5 +
 .../datasource/iceberg/IcebergTransaction.java     |  211 ++--
 .../doris/datasource/iceberg/IcebergUtils.java     |   64 +-
 .../iceberg/helper/IcebergWriterHelper.java        |   91 ++
 .../iceberg/source/IcebergApiSource.java           |    2 +-
 .../iceberg/source/IcebergHMSSource.java           |    4 +-
 .../datasource/statistics/CommonStatistics.java    |   81 ++
 .../commands/insert/IcebergInsertExecutor.java     |   28 +-
 .../org/apache/doris/planner/IcebergTableSink.java |    2 +-
 .../transaction/IcebergTransactionManager.java     |    7 +-
 .../datasource/iceberg/IcebergTransactionTest.java |  139 ++-
 17 files changed, 1979 insertions(+), 227 deletions(-)

diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 230134ade09..6934f45ef3e 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -98,6 +98,28 @@ public:
         return (v << n) >> n;
     }
 
+    template <typename T>
+    static std::string IntToByteBuffer(T input) {
+        std::string buffer;
+        T value = input;
+        for (int i = 0; i < sizeof(value); ++i) {
+            // Applies a mask for a byte range on the input.
+            char value_to_save = value & 0XFF;
+            buffer.push_back(value_to_save);
+            // Remove the just processed part from the input so that we can 
exit early if there
+            // is nothing left to process.
+            value >>= 8;
+            if (value == 0 && value_to_save >= 0) {
+                break;
+            }
+            if (value == -1 && value_to_save < 0) {
+                break;
+            }
+        }
+        std::reverse(buffer.begin(), buffer.end());
+        return buffer;
+    }
+
     // Returns ceil(log2(x)).
     // TODO: this could be faster if we use __builtin_clz.  Fix this if this 
ever shows up
     // in a hot path.
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp 
b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
index 0faebea6295..6b2a3305b9e 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp
@@ -25,6 +25,9 @@
 namespace doris {
 namespace vectorized {
 
+const std::chrono::time_point<std::chrono::system_clock> 
PartitionColumnTransformUtils::EPOCH =
+        std::chrono::system_clock::from_time_t(0);
+
 std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
         const doris::iceberg::PartitionField& field, const TypeDescriptor& 
source_type) {
     auto& transform = field.transform();
@@ -33,23 +36,98 @@ std::unique_ptr<PartitionColumnTransform> 
PartitionColumnTransforms::create(
 
     if (std::regex_match(transform, width_match, hasWidth)) {
         std::string name = width_match[1];
-        //int parsed_width = std::stoi(width_match[2]);
+        int parsed_width = std::stoi(width_match[2]);
 
         if (name == "truncate") {
             switch (source_type.type) {
+            case TYPE_INT: {
+                return 
std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type,
+                                                                               
  parsed_width);
+            }
+            case TYPE_BIGINT: {
+                return 
std::make_unique<BigintTruncatePartitionColumnTransform>(source_type,
+                                                                               
 parsed_width);
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR:
+            case TYPE_STRING: {
+                return 
std::make_unique<StringTruncatePartitionColumnTransform>(source_type,
+                                                                               
 parsed_width);
+            }
+            case TYPE_DECIMALV2: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL32: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL64: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL128I: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL256: {
+                return 
std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>(
+                        source_type, parsed_width);
+            }
             default: {
-                throw doris::Exception(
-                        doris::ErrorCode::INTERNAL_ERROR,
-                        "Unsupported type for truncate partition column 
transform {}",
-                        source_type.debug_string());
+                throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                       "Unsupported type {} for partition 
column transform {}",
+                                       source_type.debug_string(), transform);
             }
             }
         } else if (name == "bucket") {
             switch (source_type.type) {
+            case TYPE_INT: {
+                return 
std::make_unique<IntBucketPartitionColumnTransform>(source_type,
+                                                                           
parsed_width);
+            }
+            case TYPE_BIGINT: {
+                return 
std::make_unique<BigintBucketPartitionColumnTransform>(source_type,
+                                                                              
parsed_width);
+            }
+            case TYPE_VARCHAR:
+            case TYPE_CHAR:
+            case TYPE_STRING: {
+                return 
std::make_unique<StringBucketPartitionColumnTransform>(source_type,
+                                                                              
parsed_width);
+            }
+            case TYPE_DATEV2: {
+                return 
std::make_unique<DateBucketPartitionColumnTransform>(source_type,
+                                                                            
parsed_width);
+            }
+            case TYPE_DATETIMEV2: {
+                return 
std::make_unique<TimestampBucketPartitionColumnTransform>(source_type,
+                                                                               
  parsed_width);
+            }
+            case TYPE_DECIMALV2: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL32: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL64: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL128I: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>(
+                        source_type, parsed_width);
+            }
+            case TYPE_DECIMAL256: {
+                return 
std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>(
+                        source_type, parsed_width);
+            }
             default: {
                 throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                                       "Unsupported type for bucket partition 
column transform {}",
-                                       source_type.debug_string());
+                                       "Unsupported type {} for partition 
column transform {}",
+                                       source_type.debug_string(), transform);
             }
             }
         }
@@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform> 
PartitionColumnTransforms::create(
 
     if (transform == "identity") {
         return std::make_unique<IdentityPartitionColumnTransform>(source_type);
+    } else if (transform == "year") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateYearPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampYearPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "month") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateMonthPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampMonthPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "day") {
+        switch (source_type.type) {
+        case TYPE_DATEV2: {
+            return 
std::make_unique<DateDayPartitionColumnTransform>(source_type);
+        }
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampDayPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "hour") {
+        switch (source_type.type) {
+        case TYPE_DATETIMEV2: {
+            return 
std::make_unique<TimestampHourPartitionColumnTransform>(source_type);
+        }
+        default: {
+            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
+                                   "Unsupported type {} for partition column 
transform {}",
+                                   source_type.debug_string(), transform);
+        }
+        }
+    } else if (transform == "void") {
+        return std::make_unique<VoidPartitionColumnTransform>(source_type);
     } else {
         throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                               "Unsupported partition column transform: {}.", 
transform);
+                               "Unsupported type {} for partition column 
transform {}",
+                               source_type.debug_string(), transform);
     }
 }
 
+std::string PartitionColumnTransform::name() const {
+    return "default";
+}
+
 std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& 
type,
                                                       const std::any& value) 
const {
+    return get_partition_value(type, value);
+}
+
+std::string PartitionColumnTransform::get_partition_value(const 
TypeDescriptor& type,
+                                                          const std::any& 
value) const {
     if (value.has_value()) {
         switch (type.type) {
         case TYPE_BOOLEAN: {
@@ -131,19 +274,12 @@ std::string 
PartitionColumnTransform::to_human_string(const TypeDescriptor& type
         }
         default: {
             throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
-                                   "Unsupported partition column transform: 
{}",
-                                   type.debug_string());
+                                   "Unsupported type {} for partition", 
type.debug_string());
         }
         }
     }
     return "null";
 }
 
-ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block, 
int idx) {
-    const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(idx);
-    return {column_with_type_and_name.column, column_with_type_and_name.type,
-            column_with_type_and_name.name};
-}
-
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h 
b/be/src/vec/sink/writer/iceberg/partition_transformers.h
index bfa2c43d2e5..7db8bfb1886 100644
--- a/be/src/vec/sink/writer/iceberg/partition_transformers.h
+++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h
@@ -43,25 +43,81 @@ public:
             const doris::iceberg::PartitionField& field, const TypeDescriptor& 
source_type);
 };
 
+class PartitionColumnTransformUtils {
+public:
+    static DateV2Value<DateV2ValueType>& epoch_date() {
+        static DateV2Value<DateV2ValueType> epoch_date;
+        static bool initialized = false;
+        if (!initialized) {
+            epoch_date.from_date_str("1970-01-01 00:00:00", 19);
+            initialized = true;
+        }
+        return epoch_date;
+    }
+
+    static DateV2Value<DateTimeV2ValueType>& epoch_datetime() {
+        static DateV2Value<DateTimeV2ValueType> epoch_datetime;
+        static bool initialized = false;
+        if (!initialized) {
+            epoch_datetime.from_date_str("1970-01-01 00:00:00", 19);
+            initialized = true;
+        }
+        return epoch_datetime;
+    }
+
+    static std::string human_year(int year_ordinal) {
+        auto year = std::chrono::year_month_day(
+                            
std::chrono::sys_days(std::chrono::floor<std::chrono::days>(
+                                    EPOCH + std::chrono::years(year_ordinal))))
+                            .year();
+        return std::to_string(static_cast<int>(year));
+    }
+
+    static std::string human_month(int month_ordinal) {
+        auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+                std::chrono::floor<std::chrono::days>(EPOCH + 
std::chrono::months(month_ordinal))));
+        return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()));
+    }
+
+    static std::string human_day(int day_ordinal) {
+        auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+                std::chrono::floor<std::chrono::days>(EPOCH + 
std::chrono::days(day_ordinal))));
+        return fmt::format("{:04d}-{:02d}-{:02d}", 
static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()), 
static_cast<unsigned>(ymd.day()));
+    }
+
+    static std::string human_hour(int hour_ordinal) {
+        int day_value = hour_ordinal / 24;
+        int housr_value = hour_ordinal % 24;
+        auto ymd = std::chrono::year_month_day(std::chrono::sys_days(
+                std::chrono::floor<std::chrono::days>(EPOCH + 
std::chrono::days(day_value))));
+        return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}", 
static_cast<int>(ymd.year()),
+                           static_cast<unsigned>(ymd.month()), 
static_cast<unsigned>(ymd.day()),
+                           housr_value);
+    }
+
+private:
+    static const std::chrono::time_point<std::chrono::system_clock> EPOCH;
+    PartitionColumnTransformUtils() = default;
+};
+
 class PartitionColumnTransform {
 public:
     PartitionColumnTransform() = default;
 
     virtual ~PartitionColumnTransform() = default;
 
-    virtual bool preserves_non_null() const { return false; }
-
-    virtual bool monotonic() const { return true; }
-
-    virtual bool temporal() const { return false; }
+    virtual std::string name() const;
 
     virtual const TypeDescriptor& get_result_type() const = 0;
 
-    virtual bool is_void() const { return false; }
-
-    virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0;
+    virtual ColumnWithTypeAndName apply(Block& block, int column_pos) = 0;
 
     virtual std::string to_human_string(const TypeDescriptor& type, const 
std::any& value) const;
+
+    virtual std::string get_partition_value(const TypeDescriptor& type,
+                                            const std::any& value) const;
 };
 
 class IdentityPartitionColumnTransform : public PartitionColumnTransform {
@@ -69,12 +125,1211 @@ public:
     IdentityPartitionColumnTransform(const TypeDescriptor& source_type)
             : _source_type(source_type) {}
 
-    virtual const TypeDescriptor& get_result_type() const { return 
_source_type; }
+    std::string name() const override { return "Identity"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        return {column_with_type_and_name.column, 
column_with_type_and_name.type,
+                column_with_type_and_name.name};
+    }
+
+private:
+    TypeDescriptor _source_type;
+};
+
+class StringTruncatePartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "StringTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        auto int_type = std::make_shared<DataTypeInt32>();
+        size_t num_columns_without_result = block.columns();
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+
+        ColumnPtr string_column_ptr;
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            string_column_ptr = nullable_column->get_nested_column_ptr();
+            is_nullable = true;
+        } else {
+            string_column_ptr = column_with_type_and_name.column;
+            is_nullable = false;
+        }
+        block.replace_by_position(column_pos, std::move(string_column_ptr));
+        block.insert(
+                {int_type->create_column_const(block.rows(), to_field(1)), 
int_type, "const 1"});
+        block.insert({int_type->create_column_const(block.rows(), 
to_field(_width)), int_type,
+                      fmt::format("const {}", _width)});
+        block.insert({nullptr, std::make_shared<DataTypeString>(), "result"});
+        ColumnNumbers temp_arguments(3);
+        temp_arguments[0] = column_pos;                     // str column
+        temp_arguments[1] = num_columns_without_result;     // pos
+        temp_arguments[2] = num_columns_without_result + 1; // width
+        size_t result_column_id = num_columns_without_result + 2;
+
+        SubstringUtil::substring_execute(block, temp_arguments, 
result_column_id, block.rows());
+        if (is_nullable) {
+            auto res_column = 
ColumnNullable::create(block.get_by_position(result_column_id).column,
+                                                     null_map_column_ptr);
+            Block::erase_useless_column(&block, num_columns_without_result);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            auto res_column = block.get_by_position(result_column_id).column;
+            Block::erase_useless_column(&block, num_columns_without_result);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class IntegerTruncatePartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "IntegerTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt32*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const int* end_in = in_data.data() + in_data.size();
+        const Int32* __restrict p_in = in_data.data();
+        Int32* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "BigintTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt64*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt64::create();
+        ColumnInt64::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const Int64* end_in = in_data.data() + in_data.size();
+        const Int64* __restrict p_in = in_data.data();
+        Int64* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            *p_out = *p_in - ((*p_in % _width) + _width) % _width;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+template <typename T>
+class DecimalTruncatePartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type, 
int width)
+            : _source_type(source_type), _width(width) {}
+
+    std::string name() const override { return "DecimalTruncate"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_source_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+
+        ColumnPtr column_ptr;
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+            is_nullable = true;
+        } else {
+            column_ptr = column_with_type_and_name.column;
+            is_nullable = false;
+        }
+
+        const auto* const decimal_col = 
check_and_get_column<ColumnDecimal<T>>(column_ptr);
+        const auto& vec_src = decimal_col->get_data();
+
+        auto col_res = ColumnDecimal<T>::create(vec_src.size(), 
decimal_col->get_scale());
+        auto& vec_res = col_res->get_data();
+
+        const typename T::NativeType* __restrict p_in =
+                reinterpret_cast<const T::NativeType*>(vec_src.data());
+        const typename T::NativeType* end_in =
+                reinterpret_cast<const T::NativeType*>(vec_src.data()) + 
vec_src.size();
+        typename T::NativeType* __restrict p_out = 
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+        while (p_in < end_in) {
+            typename T::NativeType remainder = ((*p_in % _width) + _width) % 
_width;
+            *p_out = *p_in - remainder;
+            ++p_in;
+            ++p_out;
+        }
+
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {res_column,
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _width;
+};
+
+class IntBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int 
bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "IntBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt32*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const int* end_in = in_data.data() + in_data.size();
+        const Int32* __restrict p_in = in_data.data();
+        Int32* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            Int64 long_value = static_cast<Int64>(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+            *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class BigintBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "BigintBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnInt64*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt64::create();
+        ColumnInt64::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const Int64* end_in = in_data.data() + in_data.size();
+        const Int64* __restrict p_in = in_data.data();
+        Int64* __restrict p_out = out_data.data();
+        while (p_in < end_in) {
+            Int64 long_value = static_cast<Int64>(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+            int value = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+            *p_out = value;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+template <typename T>
+class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DecimalBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDecimal<T>*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        auto& vec_res = col_res->get_data();
+
+        const typename T::NativeType* __restrict p_in =
+                reinterpret_cast<const T::NativeType*>(in_data.data());
+        const typename T::NativeType* end_in =
+                reinterpret_cast<const T::NativeType*>(in_data.data()) + 
in_data.size();
+        typename T::NativeType* __restrict p_out = 
reinterpret_cast<T::NativeType*>(vec_res.data());
+
+        while (p_in < end_in) {
+            std::string buffer = BitUtil::IntToByteBuffer(*p_in);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(), 
buffer.size(), 0);
+            *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            return std::to_string(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class DateBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int 
bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const auto* end_in = in_data.data() + in_data.size();
+
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+
+            int32_t days_from_unix_epoch = value.daynr() - 719528;
+            Int64 long_value = static_cast<Int64>(days_from_unix_epoch);
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+
+            *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class TimestampBucketPartitionColumnTransform : public 
PartitionColumnTransform {
+public:
+    TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+        const auto* end_in = in_data.data() + in_data.size();
+
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+
+            int64_t timestamp;
+            if (!value.unix_timestamp(&timestamp, "UTC")) {
+                LOG(WARNING) << "Failed to call unix_timestamp :" << 
value.debug_string();
+                timestamp = 0;
+            }
+            Int64 long_value = static_cast<Int64>(timestamp) * 1000000;
+            uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, 
sizeof(long_value), 0);
+
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return std::to_string(std::any_cast<Int32>(value));
+            ;
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class StringBucketPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    StringBucketPartitionColumnTransform(const TypeDescriptor& source_type, 
int bucket_num)
+            : _source_type(source_type), _bucket_num(bucket_num), 
_target_type(TYPE_INT) {}
+
+    std::string name() const override { return "StringBucket"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto* str_col = assert_cast<const 
ColumnString*>(column_ptr.get());
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        const auto& data = str_col->get_chars();
+        const auto& offsets = str_col->get_offsets();
+
+        size_t offset_size = offsets.size();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(offset_size);
+        auto* __restrict p_out = out_data.data();
+
+        for (int i = 0; i < offset_size; i++) {
+            const unsigned char* raw_str = &data[offsets[i - 1]];
+            ColumnString::Offset size = offsets[i] - offsets[i - 1];
+            uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0);
+
+            *p_out = (hash_value & INT32_MAX) % _bucket_num;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    int _bucket_num;
+    TypeDescriptor _target_type;
+};
+
+class DateYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateYearPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateYear"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampYearPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampYear"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class DateMonthPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateMonth"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform 
{
+public:
+    TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampMonth"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class DateDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    DateDayPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "DateDay"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateV2ValueType> value =
+                    binary_cast<uint32_t, 
DateV2Value<DateV2ValueType>>(*(UInt32*)p_in);
+            *p_out = 
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            int day_value = std::any_cast<Int32>(value);
+            return PartitionColumnTransformUtils::human_day(day_value);
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampDayPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampDay"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        return get_partition_value(type, value);
+    }
+
+    std::string get_partition_value(const TypeDescriptor& type,
+                                    const std::any& value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class TimestampHourPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(TYPE_INT) {}
+
+    std::string name() const override { return "TimestampHour"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        //1) get the target column ptr
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
+        ColumnPtr column_ptr = 
column_with_type_and_name.column->convert_to_full_column_if_const();
+        CHECK(column_ptr != nullptr);
+
+        //2) get the input data from block
+        ColumnPtr null_map_column_ptr;
+        bool is_nullable = false;
+        if (column_ptr->is_nullable()) {
+            const ColumnNullable* nullable_column =
+                    reinterpret_cast<const 
vectorized::ColumnNullable*>(column_ptr.get());
+            is_nullable = true;
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        }
+        const auto& in_data = assert_cast<const 
ColumnDateTimeV2*>(column_ptr.get())->get_data();
+
+        //3) do partition routing
+        auto col_res = ColumnInt32::create();
+        ColumnInt32::Container& out_data = col_res->get_data();
+        out_data.resize(in_data.size());
+
+        const auto* end_in = in_data.data() + in_data.size();
+        const auto* __restrict p_in = in_data.data();
+        auto* __restrict p_out = out_data.data();
+
+        while (p_in < end_in) {
+            DateV2Value<DateTimeV2ValueType> value =
+                    binary_cast<uint64_t, 
DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in);
+            *p_out = 
datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value);
+            ++p_in;
+            ++p_out;
+        }
+
+        //4) create the partition column and return
+        if (is_nullable) {
+            auto res_column = ColumnNullable::create(std::move(col_res), 
null_map_column_ptr);
+            return {std::move(res_column),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                    column_with_type_and_name.name};
+        } else {
+            return {std::move(col_res),
+                    
DataTypeFactory::instance().create_data_type(get_result_type(), false),
+                    column_with_type_and_name.name};
+        }
+    }
+
+    std::string to_human_string(const TypeDescriptor& type, const std::any& 
value) const override {
+        if (value.has_value()) {
+            return 
PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value));
+        } else {
+            return "null";
+        }
+    }
+
+private:
+    TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
+};
+
+class VoidPartitionColumnTransform : public PartitionColumnTransform {
+public:
+    VoidPartitionColumnTransform(const TypeDescriptor& source_type)
+            : _source_type(source_type), _target_type(source_type) {}
+
+    std::string name() const override { return "Void"; }
+
+    const TypeDescriptor& get_result_type() const override { return 
_target_type; }
+
+    ColumnWithTypeAndName apply(Block& block, int column_pos) override {
+        const ColumnWithTypeAndName& column_with_type_and_name = 
block.get_by_position(column_pos);
 
-    virtual ColumnWithTypeAndName apply(Block& block, int idx);
+        ColumnPtr column_ptr;
+        ColumnPtr null_map_column_ptr;
+        if (auto* nullable_column =
+                    
check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) {
+            null_map_column_ptr = nullable_column->get_null_map_column_ptr();
+            column_ptr = nullable_column->get_nested_column_ptr();
+        } else {
+            column_ptr = column_with_type_and_name.column;
+        }
+        auto res_column = ColumnNullable::create(std::move(column_ptr),
+                                                 
ColumnUInt8::create(column_ptr->size(), 1));
+        return {std::move(res_column),
+                
DataTypeFactory::instance().create_data_type(get_result_type(), true),
+                column_with_type_and_name.name};
+    }
 
 private:
     TypeDescriptor _source_type;
+    TypeDescriptor _target_type;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp 
b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
index 2703330406c..e59b0593f7b 100644
--- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
+++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp
@@ -329,8 +329,8 @@ std::vector<std::string> 
VIcebergTableWriter::_partition_values(
         TypeDescriptor result_type =
                 
iceberg_partition_column.partition_column_transform().get_result_type();
         partition_values.emplace_back(
-                
iceberg_partition_column.partition_column_transform().to_human_string(result_type,
-                                                                               
       data.get(i)));
+                
iceberg_partition_column.partition_column_transform().get_partition_value(
+                        result_type, data.get(i)));
     }
 
     return partition_values;
@@ -407,21 +407,25 @@ std::optional<PartitionData> 
VIcebergTableWriter::_get_partition_data(
 std::any VIcebergTableWriter::_get_iceberg_partition_value(
         const TypeDescriptor& type_desc, const ColumnWithTypeAndName& 
partition_column,
         int position) {
-    ColumnPtr column;
-    if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*partition_column.column)) {
+    //1) get the partition column ptr
+    ColumnPtr col_ptr = 
partition_column.column->convert_to_full_column_if_const();
+    CHECK(col_ptr != nullptr);
+    if (col_ptr->is_nullable()) {
+        const ColumnNullable* nullable_column =
+                reinterpret_cast<const 
vectorized::ColumnNullable*>(col_ptr.get());
         auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
         if (null_map_data[position]) {
             return std::any();
         }
-        column = nullable_column->get_nested_column_ptr();
-    } else {
-        column = partition_column.column;
+        col_ptr = nullable_column->get_nested_column_ptr();
     }
-    auto [item, size] = column->get_data_at(position);
+
+    //2) get parition field data from paritionblock
+    auto [item, size] = col_ptr->get_data_at(position);
     switch (type_desc.type) {
     case TYPE_BOOLEAN: {
         vectorized::Field field =
-                vectorized::check_and_get_column<const 
ColumnUInt8>(*column)->operator[](position);
+                vectorized::check_and_get_column<const 
ColumnUInt8>(*col_ptr)->operator[](position);
         return field.get<bool>();
     }
     case TYPE_TINYINT: {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
new file mode 100644
index 00000000000..6fdb27e1d0b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java
+// and modified by Doris
+
+package org.apache.doris.common.info;
+
+import java.util.Objects;
+
+public class SimpleTableInfo {
+
+    private final String dbName;
+    private final String tbName;
+
+    public SimpleTableInfo(String dbName, String tbName) {
+        this.dbName = dbName;
+        this.tbName = tbName;
+    }
+
+    public String getDbName() {
+        return dbName;
+    }
+
+    public String getTbName() {
+        return tbName;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dbName, tbName);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (other == null || getClass() != other.getClass()) {
+            return false;
+        }
+
+        SimpleTableInfo that = (SimpleTableInfo) other;
+        return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, 
that.tbName);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s.%s", dbName, tbName);
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index acda08b7378..68064c4e439 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
@@ -85,6 +86,20 @@ public class IcebergMetadataCache {
         return tableCache.get(key);
     }
 
+    public Table getAndCloneTable(CatalogIf catalog, String dbName, String 
tbName) {
+        Table restTable;
+        synchronized (this) {
+            Table table = getIcebergTable(catalog, dbName, tbName);
+            restTable = SerializableTable.copyOf(table);
+        }
+        return restTable;
+    }
+
+    public Table getRemoteTable(CatalogIf catalog, String dbName, String 
tbName) {
+        IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, 
dbName, tbName);
+        return loadTable(key);
+    }
+
     @NotNull
     private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) {
         Table icebergTable = getIcebergTable(key.catalog, key.dbName, 
key.tableName);
@@ -116,7 +131,7 @@ public class IcebergMetadataCache {
     public void invalidateCatalogCache(long catalogId) {
         snapshotListCache.asMap().keySet().stream()
                 .filter(key -> key.catalog.getId() == catalogId)
-            .forEach(snapshotListCache::invalidate);
+                .forEach(snapshotListCache::invalidate);
 
         tableCache.asMap().entrySet().stream()
                 .filter(entry -> entry.getKey().catalog.getId() == catalogId)
@@ -130,7 +145,7 @@ public class IcebergMetadataCache {
         snapshotListCache.asMap().keySet().stream()
                 .filter(key -> key.catalog.getId() == catalogId && 
key.dbName.equals(dbName) && key.tableName.equals(
                         tblName))
-            .forEach(snapshotListCache::invalidate);
+                .forEach(snapshotListCache::invalidate);
 
         tableCache.asMap().entrySet().stream()
                 .filter(entry -> {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 7161f48680a..7bd8409ca2c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -64,6 +64,10 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         return catalog;
     }
 
+    public IcebergExternalCatalog getExternalCatalog() {
+        return dorisCatalog;
+    }
+
     @Override
     public void close() {
     }
@@ -173,4 +177,5 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         catalog.dropTable(TableIdentifier.of(dbName, tableName));
         db.setUnInitialized(true);
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
index 5025e075142..a3a978ccd7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java
@@ -21,31 +21,39 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.common.UserException;
-import org.apache.doris.thrift.TFileContent;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext;
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext;
 import org.apache.doris.thrift.TIcebergCommitData;
+import org.apache.doris.thrift.TUpdateMode;
 import org.apache.doris.transaction.Transaction;
 
-import com.google.common.base.VerifyException;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileContent;
-import org.apache.iceberg.Metrics;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.ReplacePartitions;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.WriteResult;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
-import java.util.stream.Collectors;
 
 public class IcebergTransaction implements Transaction {
 
     private static final Logger LOG = 
LogManager.getLogger(IcebergTransaction.class);
+
     private final IcebergMetadataOps ops;
+    private SimpleTableInfo tableInfo;
+    private Table table;
+
+
     private org.apache.iceberg.Transaction transaction;
     private final List<TIcebergCommitData> commitDataList = 
Lists.newArrayList();
 
@@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction {
         }
     }
 
-    public void beginInsert(String dbName, String tbName) {
-        Table icebergTable = 
ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName));
-        transaction = icebergTable.newTransaction();
+    public void beginInsert(SimpleTableInfo tableInfo) {
+        this.tableInfo = tableInfo;
+        this.table = getNativeTable(tableInfo);
+        this.transaction = table.newTransaction();
     }
 
-    public void finishInsert() {
-        Table icebergTable = transaction.table();
-        AppendFiles appendFiles = transaction.newAppend();
-
-        for (CommitTaskData task : convertToCommitTaskData()) {
-            DataFiles.Builder builder = DataFiles.builder(icebergTable.spec())
-                    .withPath(task.getPath())
-                    .withFileSizeInBytes(task.getFileSizeInBytes())
-                    .withFormat(IcebergUtils.getFileFormat(icebergTable))
-                    .withMetrics(task.getMetrics());
-
-            if (icebergTable.spec().isPartitioned()) {
-                List<String> partitionValues = task.getPartitionValues()
-                        .orElseThrow(() -> new VerifyException("No partition 
data for partitioned table"));
-                builder.withPartitionValues(partitionValues);
-            }
-            appendFiles.appendFile(builder.build());
+    public void finishInsert(SimpleTableInfo tableInfo, 
Optional<InsertCommandContext> insertCtx) {
+        if (LOG.isDebugEnabled()) {
+            LOG.info("iceberg table {} insert table finished!", tableInfo);
         }
 
-        // in appendFiles.commit, it will generate metadata(manifest and 
snapshot)
-        // after appendFiles.commit, in current transaction, you can already 
see the new snapshot
-        appendFiles.commit();
-    }
-
-    public List<CommitTaskData> convertToCommitTaskData() {
-        List<CommitTaskData> commitTaskData = new ArrayList<>();
-        for (TIcebergCommitData data : this.commitDataList) {
-            commitTaskData.add(new CommitTaskData(
-                    data.getFilePath(),
-                    data.getFileSize(),
-                    new Metrics(
-                            data.getRowCount(),
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP,
-                            Collections.EMPTY_MAP
-                    ),
-                    data.isSetPartitionValues() ? 
Optional.of(data.getPartitionValues()) : Optional.empty(),
-                    convertToFileContent(data.getFileContent()),
-                    data.isSetReferencedDataFiles() ? 
Optional.of(data.getReferencedDataFiles()) : Optional.empty()
-            ));
+        //create and start the iceberg transaction
+        TUpdateMode updateMode = TUpdateMode.APPEND;
+        if (insertCtx.isPresent()) {
+            updateMode = ((BaseExternalTableInsertCommandContext) 
insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE
+                    : TUpdateMode.APPEND;
         }
-        return commitTaskData;
+        updateManifestAfterInsert(updateMode);
     }
 
-    private FileContent convertToFileContent(TFileContent content) {
-        if (content.equals(TFileContent.DATA)) {
-            return FileContent.DATA;
-        } else if (content.equals(TFileContent.POSITION_DELETES)) {
-            return FileContent.POSITION_DELETES;
+    private void updateManifestAfterInsert(TUpdateMode updateMode) {
+        PartitionSpec spec = table.spec();
+        FileFormat fileFormat = IcebergUtils.getFileFormat(table);
+
+        //convert commitDataList to writeResult
+        WriteResult writeResult = IcebergWriterHelper
+                .convertToWriterResult(fileFormat, spec, commitDataList);
+        List<WriteResult> pendingResults = Lists.newArrayList(writeResult);
+
+        if (spec.isPartitioned()) {
+            partitionManifestUpdate(updateMode, table, pendingResults);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("{} {} table partition manifest  successful and 
writeResult : {}..", tableInfo, updateMode,
+                        writeResult);
+            }
         } else {
-            return FileContent.EQUALITY_DELETES;
+            tableManifestUpdate(updateMode, table, pendingResults);
+            if (LOG.isDebugEnabled()) {
+                LOG.info("{} {}  table  manifest  successful and writeResult : 
{}..", tableInfo, updateMode,
+                        writeResult);
+            }
         }
     }
 
     @Override
     public void commit() throws UserException {
-        // Externally readable
-        // Manipulate the relevant data so that others can also see the latest 
table, such as:
-        //   1. hadoop: it will change the version number information in 
'version-hint.text'
-        //   2. hive: it will change the table properties, the most important 
thing is to revise 'metadata_location'
-        //   3. and so on ...
+        // commit the iceberg transaction
         transaction.commitTransaction();
     }
 
     @Override
     public void rollback() {
-
+        //do nothing
     }
 
     public long getUpdateCnt() {
         return 
commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum();
     }
 
-    public static class CommitTaskData {
-        private final String path;
-        private final long fileSizeInBytes;
-        private final Metrics metrics;
-        private final Optional<List<String>> partitionValues;
-        private final FileContent content;
-        private final Optional<List<String>> referencedDataFiles;
-
-        public CommitTaskData(String path,
-                              long fileSizeInBytes,
-                              Metrics metrics,
-                              Optional<List<String>> partitionValues,
-                              FileContent content,
-                              Optional<List<String>> referencedDataFiles) {
-            this.path = path;
-            this.fileSizeInBytes = fileSizeInBytes;
-            this.metrics = metrics;
-            this.partitionValues = 
convertPartitionValuesForNull(partitionValues);
-            this.content = content;
-            this.referencedDataFiles = referencedDataFiles;
-        }
 
-        private Optional<List<String>> 
convertPartitionValuesForNull(Optional<List<String>> partitionValues) {
-            if (!partitionValues.isPresent()) {
-                return partitionValues;
-            }
-            List<String> values = partitionValues.get();
-            if (!values.contains("null")) {
-                return partitionValues;
-            }
-            return Optional.of(values.stream().map(s -> s.equals("null") ? 
null : s).collect(Collectors.toList()));
-        }
+    private synchronized Table getNativeTable(SimpleTableInfo tableInfo) {
+        Objects.requireNonNull(tableInfo);
+        IcebergExternalCatalog externalCatalog = ops.getExternalCatalog();
+        return IcebergUtils.getRemoteTable(externalCatalog, tableInfo);
+    }
 
-        public String getPath() {
-            return path;
+    private void partitionManifestUpdate(TUpdateMode updateMode, Table table, 
List<WriteResult> pendingResults) {
+        if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+            LOG.warn("{} partitionManifestUp method call but pendingResults is 
null or empty!", table.name());
+            return;
         }
-
-        public long getFileSizeInBytes() {
-            return fileSizeInBytes;
+        // Commit the appendPartitionOperator transaction.
+        if (updateMode == TUpdateMode.APPEND) {
+            commitAppendTxn(table, pendingResults);
+        } else {
+            ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+            for (WriteResult result : pendingResults) {
+                Preconditions.checkState(result.referencedDataFiles().length 
== 0,
+                        "Should have no referenced data files.");
+                
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+            }
+            appendPartitionOp.commit();
         }
+    }
 
-        public Metrics getMetrics() {
-            return metrics;
+    private void tableManifestUpdate(TUpdateMode updateMode, Table table, 
List<WriteResult> pendingResults) {
+        if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) {
+            LOG.warn("{} tableManifestUp method call but pendingResults is 
null or empty!", table.name());
+            return;
         }
-
-        public Optional<List<String>> getPartitionValues() {
-            return partitionValues;
+        // Commit the appendPartitionOperator transaction.
+        if (LOG.isDebugEnabled()) {
+            LOG.info("{} tableManifestUp method call  ", table.name());
         }
-
-        public FileContent getContent() {
-            return content;
+        if (updateMode == TUpdateMode.APPEND) {
+            commitAppendTxn(table, pendingResults);
+        } else {
+            ReplacePartitions appendPartitionOp = table.newReplacePartitions();
+            for (WriteResult result : pendingResults) {
+                Preconditions.checkState(result.referencedDataFiles().length 
== 0,
+                        "Should have no referenced data files.");
+                
Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile);
+            }
+            appendPartitionOp.commit();
         }
+    }
 
-        public Optional<List<String>> getReferencedDataFiles() {
-            return referencedDataFiles;
+
+    private void commitAppendTxn(Table table, List<WriteResult> 
pendingResults) {
+        // To be compatible with iceberg format V1.
+        AppendFiles appendFiles = table.newAppend();
+        for (WriteResult result : pendingResults) {
+            Preconditions.checkState(result.referencedDataFiles().length == 0,
+                    "Should have no referenced data files for append.");
+            Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
         }
+        appendFiles.commit();
     }
+
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index 2aa5dda35a4..512e6a3ee93 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField;
 import org.apache.doris.catalog.StructType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper;
@@ -50,6 +51,7 @@ import 
org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.collect.Lists;
+import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -87,6 +89,8 @@ public class IcebergUtils {
     // https://iceberg.apache.org/spec/#schemas-and-data-types
     // All time and timestamp values are stored with microsecond precision
     private static final int ICEBERG_DATETIME_SCALE_MS = 6;
+    private static final String PARQUET_NAME = "parquet";
+    private static final String ORC_NAME = "orc";
 
     public static final String TOTAL_RECORDS = "total-records";
     public static final String TOTAL_POSITION_DELETES = 
"total-position-deletes";
@@ -522,8 +526,8 @@ public class IcebergUtils {
             case MAP:
                 Types.MapType map = (Types.MapType) type;
                 return new MapType(
-                    icebergTypeToDorisType(map.keyType()),
-                    icebergTypeToDorisType(map.valueType())
+                        icebergTypeToDorisType(map.keyType()),
+                        icebergTypeToDorisType(map.valueType())
                 );
             case STRUCT:
                 Types.StructType struct = (Types.StructType) type;
@@ -536,11 +540,30 @@ public class IcebergUtils {
         }
     }
 
+
     public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog 
catalog, String dbName, String tblName) {
+        return getIcebergTableInternal(catalog, dbName, tblName, false);
+    }
+
+    public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog 
catalog, SimpleTableInfo tableInfo) {
+        return getIcebergTableInternal(catalog, tableInfo.getDbName(), 
tableInfo.getTbName(), true);
+    }
+
+    public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog 
catalog, SimpleTableInfo tableInfo) {
         return Env.getCurrentEnv()
                 .getExtMetaCacheMgr()
                 .getIcebergMetadataCache()
-                .getIcebergTable(catalog, dbName, tblName);
+                .getRemoteTable(catalog, tableInfo.getDbName(), 
tableInfo.getTbName());
+    }
+
+    private static org.apache.iceberg.Table 
getIcebergTableInternal(ExternalCatalog catalog, String dbName,
+            String tblName,
+            boolean isClone) {
+        IcebergMetadataCache metadataCache = Env.getCurrentEnv()
+                .getExtMetaCacheMgr()
+                .getIcebergMetadataCache();
+        return isClone ? metadataCache.getAndCloneTable(catalog, dbName, 
tblName)
+                : metadataCache.getIcebergTable(catalog, dbName, tblName);
     }
 
     /**
@@ -587,17 +610,27 @@ public class IcebergUtils {
         return -1;
     }
 
-    public static String getFileFormat(Table table) {
-        Map<String, String> properties = table.properties();
+
+    public static FileFormat getFileFormat(Table icebergTable) {
+        Map<String, String> properties = icebergTable.properties();
+        String fileFormatName;
         if (properties.containsKey(WRITE_FORMAT)) {
-            return properties.get(WRITE_FORMAT);
+            fileFormatName = properties.get(WRITE_FORMAT);
+        } else {
+            fileFormatName = 
properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME);
         }
-        if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
-            return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+        FileFormat fileFormat;
+        if (fileFormatName.toLowerCase().contains(ORC_NAME)) {
+            fileFormat = FileFormat.ORC;
+        } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) {
+            fileFormat = FileFormat.PARQUET;
+        } else {
+            throw new RuntimeException("Unsupported input format type: " + 
fileFormatName);
         }
-        return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+        return fileFormat;
     }
 
+
     public static String getFileCompress(Table table) {
         Map<String, String> properties = table.properties();
         if (properties.containsKey(COMPRESSION_CODEC)) {
@@ -605,11 +638,11 @@ public class IcebergUtils {
         } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) {
             return properties.get(SPARK_SQL_COMPRESSION_CODEC);
         }
-        String fileFormat = getFileFormat(table);
-        if (fileFormat.equalsIgnoreCase("parquet")) {
+        FileFormat fileFormat = getFileFormat(table);
+        if (fileFormat == FileFormat.PARQUET) {
             return properties.getOrDefault(
                     TableProperties.PARQUET_COMPRESSION, 
TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0);
-        } else if (fileFormat.equalsIgnoreCase("orc")) {
+        } else if (fileFormat == FileFormat.ORC) {
             return properties.getOrDefault(
                     TableProperties.ORC_COMPRESSION, 
TableProperties.ORC_COMPRESSION_DEFAULT);
         }
@@ -620,9 +653,10 @@ public class IcebergUtils {
         Map<String, String> properties = table.properties();
         if 
(properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) {
             throw new NotSupportedException(
-                "Table " + table.name() + " specifies " + 
properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
-                    + " as a location provider. "
-                    + "Writing to Iceberg tables with custom location provider 
is not supported.");
+                    "Table " + table.name() + " specifies " + properties
+                            .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)
+                            + " as a location provider. "
+                            + "Writing to Iceberg tables with custom location 
provider is not supported.");
         }
         String dataLocation = 
properties.get(TableProperties.WRITE_DATA_LOCATION);
         if (dataLocation == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
new file mode 100644
index 00000000000..4171a0536f9
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.helper;
+
+import org.apache.doris.datasource.statistics.CommonStatistics;
+import org.apache.doris.thrift.TIcebergCommitData;
+
+import com.google.common.base.VerifyException;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.io.WriteResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class IcebergWriterHelper {
+
+    private static final int DEFAULT_FILE_COUNT = 1;
+
+    public static WriteResult convertToWriterResult(
+            FileFormat format,
+            PartitionSpec spec,
+            List<TIcebergCommitData> commitDataList) {
+        List<DataFile> dataFiles = new ArrayList<>();
+        for (TIcebergCommitData commitData : commitDataList) {
+            //get the files path
+            String location = commitData.getFilePath();
+
+            //get the commit file statistics
+            long fileSize = commitData.getFileSize();
+            long recordCount = commitData.getRowCount();
+            CommonStatistics stat = new CommonStatistics(recordCount, 
DEFAULT_FILE_COUNT, fileSize);
+
+            Optional<List<String>> partValues = Optional.empty();
+            //get and check partitionValues when table is partitionedTable
+            if (spec.isPartitioned()) {
+                List<String> partitionValues = commitData.getPartitionValues();
+                if (Objects.isNull(partitionValues) || 
partitionValues.isEmpty()) {
+                    throw new VerifyException("No partition data for 
partitioned table");
+                }
+                partitionValues = partitionValues.stream().map(s -> 
s.equals("null") ? null : s)
+                        .collect(Collectors.toList());
+                partValues = Optional.of(partitionValues);
+            }
+            DataFile dataFile = genDataFile(format, location, spec, 
partValues, stat);
+            dataFiles.add(dataFile);
+        }
+        return WriteResult.builder()
+                .addDataFiles(dataFiles)
+                .build();
+
+    }
+
+    public static DataFile genDataFile(
+            FileFormat format,
+            String location,
+            PartitionSpec spec,
+            Optional<List<String>> partValues,
+            CommonStatistics statistics) {
+
+        DataFiles.Builder builder = DataFiles.builder(spec)
+                .withPath(location)
+                .withFileSizeInBytes(statistics.getTotalFileBytes())
+                .withRecordCount(statistics.getRowCount())
+                .withFormat(format);
+
+        partValues.ifPresent(builder::withPartitionValues);
+
+        return builder.build();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index e590e918344..56ff188f964 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource {
 
     @Override
     public String getFileFormat() {
-        return IcebergUtils.getFileFormat(originTable);
+        return IcebergUtils.getFileFormat(originTable).name();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 06b785a15f8..5e9860171d0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource {
     private final org.apache.iceberg.Table icebergTable;
 
     public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc,
-                            Map<String, ColumnRange> columnNameToRange) {
+            Map<String, ColumnRange> columnNameToRange) {
         this.hmsTable = hmsTable;
         this.desc = desc;
         this.columnNameToRange = columnNameToRange;
@@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource {
 
     @Override
     public String getFileFormat() throws DdlException, MetaNotFoundException {
-        return IcebergUtils.getFileFormat(icebergTable);
+        return IcebergUtils.getFileFormat(icebergTable).name();
     }
 
     public org.apache.iceberg.Table getIcebergTable() throws 
MetaNotFoundException {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
new file mode 100644
index 00000000000..9685dfdf35a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.statistics;
+
+public class CommonStatistics {
+
+    public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 
0L);
+
+    private final long rowCount;
+    private final long fileCount;
+    private final long totalFileBytes;
+
+    public CommonStatistics(long rowCount, long fileCount, long 
totalFileBytes) {
+        this.fileCount = fileCount;
+        this.rowCount = rowCount;
+        this.totalFileBytes = totalFileBytes;
+    }
+
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    public long getFileCount() {
+        return fileCount;
+    }
+
+    public long getTotalFileBytes() {
+        return totalFileBytes;
+    }
+
+    public static CommonStatistics reduce(
+            CommonStatistics current,
+            CommonStatistics update,
+            ReduceOperator operator) {
+        return new CommonStatistics(
+                reduce(current.getRowCount(), update.getRowCount(), operator),
+                reduce(current.getFileCount(), update.getFileCount(), 
operator),
+                reduce(current.getTotalFileBytes(), 
update.getTotalFileBytes(), operator));
+    }
+
+    public static long reduce(long current, long update, ReduceOperator 
operator) {
+        if (current >= 0 && update >= 0) {
+            switch (operator) {
+                case ADD:
+                    return current + update;
+                case SUBTRACT:
+                    return current - update;
+                case MAX:
+                    return Math.max(current, update);
+                case MIN:
+                    return Math.min(current, update);
+                default:
+                    throw new IllegalArgumentException("Unexpected operator: " 
+ operator);
+            }
+        }
+
+        return 0;
+    }
+
+    public enum ReduceOperator {
+        ADD,
+        SUBTRACT,
+        MIN,
+        MAX,
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
index b19c483c9f3..86b1f1ef0b7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java
@@ -18,6 +18,7 @@
 package org.apache.doris.nereids.trees.plans.commands.insert;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergTransaction;
 import org.apache.doris.nereids.NereidsPlanner;
@@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
      * constructor
      */
     public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable 
table,
-                                 String labelName, NereidsPlanner planner,
-                                 Optional<InsertCommandContext> insertCtx,
-                                 boolean emptyInsert) {
+            String labelName, NereidsPlanner planner,
+            Optional<InsertCommandContext> insertCtx,
+            boolean emptyInsert) {
         super(ctx, table, labelName, planner, insertCtx, emptyInsert);
     }
 
@@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
         
coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData);
     }
 
+    @Override
+    protected void beforeExec() {
+        String dbName = ((IcebergExternalTable) table).getDbName();
+        String tbName = table.getName();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
+        IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
+        transaction.beginInsert(tableInfo);
+    }
+
     @Override
     protected void doBeforeCommit() throws UserException {
+        String dbName = ((IcebergExternalTable) table).getDbName();
+        String tbName = table.getName();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName);
         IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        loadedRows = transaction.getUpdateCnt();
-        transaction.finishInsert();
+        this.loadedRows = transaction.getUpdateCnt();
+        transaction.finishInsert(tableInfo, insertCtx);
     }
 
     @Override
@@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends 
BaseExternalTableInsertExecutor {
         return TransactionType.ICEBERG;
     }
 
-    @Override
-    protected void beforeExec() {
-        IcebergTransaction transaction = (IcebergTransaction) 
transactionManager.getTransaction(txnId);
-        transaction.beginInsert(((IcebergExternalTable) table).getDbName(), 
table.getName());
-    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
index 659be7cb1fe..0e01b599964 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java
@@ -121,7 +121,7 @@ public class IcebergTableSink extends 
BaseExternalTableDataSink {
         }
 
         // file info
-        
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable)));
+        
tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name()));
         
tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable)));
 
         // hadoop config
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
index 3d6486f9391..f4b802aaa99 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.transaction;
 
+
 import org.apache.doris.catalog.Env;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
@@ -55,12 +56,12 @@ public class IcebergTransactionManager implements 
TransactionManager {
     }
 
     @Override
-    public Transaction getTransaction(long id) {
+    public IcebergTransaction getTransaction(long id) {
         return getTransactionWithException(id);
     }
 
-    public Transaction getTransactionWithException(long id) {
-        Transaction icebergTransaction = transactions.get(id);
+    public IcebergTransaction getTransactionWithException(long id) {
+        IcebergTransaction icebergTransaction = transactions.get(id);
         if (icebergTransaction == null) {
             throw new RuntimeException("Can't find transaction for " + id);
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
index 10de5427902..4375dc5c025 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java
@@ -18,15 +18,22 @@
 package org.apache.doris.datasource.iceberg;
 
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.info.SimpleTableInfo;
+import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.thrift.TFileContent;
 import org.apache.doris.thrift.TIcebergCommitData;
 
+import com.google.common.collect.Maps;
+import mockit.Mock;
+import mockit.MockUp;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
@@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.DateTimeUtil;
 import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.time.Instant;
@@ -50,23 +58,26 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class IcebergTransactionTest {
 
-    public static String dbName = "db3";
-    public static String tbWithPartition = "tbWithPartition";
-    public static String tbWithoutPartition = "tbWithoutPartition";
-    public static IcebergMetadataOps ops;
-    public static Schema schema;
+    private static String dbName = "db3";
+    private static String tbWithPartition = "tbWithPartition";
+    private static String tbWithoutPartition = "tbWithoutPartition";
 
-    @BeforeClass
-    public static void beforeClass() throws IOException {
+    private IcebergExternalCatalog externalCatalog;
+    private IcebergMetadataOps ops;
+
+
+    @Before
+    public void init() throws IOException {
         createCatalog();
         createTable();
     }
 
-    public static void createCatalog() throws IOException {
+    private void createCatalog() throws IOException {
         Path warehousePath = Files.createTempDirectory("test_warehouse_");
         String warehouse = "file://" + warehousePath.toAbsolutePath() + "/";
         HadoopCatalog hadoopCatalog = new HadoopCatalog();
@@ -74,25 +85,32 @@ public class IcebergTransactionTest {
         props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse);
         hadoopCatalog.setConf(new Configuration());
         hadoopCatalog.initialize("df", props);
-        ops = new IcebergMetadataOps(null, hadoopCatalog);
+        this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", 
"", Maps.newHashMap(), "");
+        new MockUp<IcebergHMSExternalCatalog>() {
+            @Mock
+            public Catalog getCatalog() {
+                return hadoopCatalog;
+            }
+        };
+        ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog);
     }
 
-    public static void createTable() throws IOException {
+    private void createTable() throws IOException {
         HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog();
         icebergCatalog.createNamespace(Namespace.of(dbName));
-        schema = new Schema(
-            Types.NestedField.required(11, "ts1", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(12, "ts2", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(13, "ts3", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(14, "ts4", 
Types.TimestampType.withoutZone()),
-            Types.NestedField.required(15, "dt1", Types.DateType.get()),
-            Types.NestedField.required(16, "dt2", Types.DateType.get()),
-            Types.NestedField.required(17, "dt3", Types.DateType.get()),
-            Types.NestedField.required(18, "dt4", Types.DateType.get()),
-            Types.NestedField.required(19, "str1", Types.StringType.get()),
-            Types.NestedField.required(20, "str2", Types.StringType.get()),
-            Types.NestedField.required(21, "int1", Types.IntegerType.get()),
-            Types.NestedField.required(22, "int2", Types.IntegerType.get())
+        Schema schema = new Schema(
+                Types.NestedField.required(11, "ts1", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(12, "ts2", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(13, "ts3", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(14, "ts4", 
Types.TimestampType.withoutZone()),
+                Types.NestedField.required(15, "dt1", Types.DateType.get()),
+                Types.NestedField.required(16, "dt2", Types.DateType.get()),
+                Types.NestedField.required(17, "dt3", Types.DateType.get()),
+                Types.NestedField.required(18, "dt4", Types.DateType.get()),
+                Types.NestedField.required(19, "str1", Types.StringType.get()),
+                Types.NestedField.required(20, "str2", Types.StringType.get()),
+                Types.NestedField.required(21, "int1", 
Types.IntegerType.get()),
+                Types.NestedField.required(22, "int2", Types.IntegerType.get())
         );
 
         PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
@@ -112,7 +130,7 @@ public class IcebergTransactionTest {
         icebergCatalog.createTable(TableIdentifier.of(dbName, 
tbWithoutPartition), schema);
     }
 
-    public List<String> createPartitionValues() {
+    private List<String> createPartitionValues() {
 
         Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
         long ts = DateTimeUtil.microsFromInstant(instant);
@@ -165,14 +183,23 @@ public class IcebergTransactionTest {
         ctdList.add(ctd1);
         ctdList.add(ctd2);
 
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithPartition));
+
+        new MockUp<IcebergUtils>() {
+            @Mock
+            public Table getRemoteTable(ExternalCatalog catalog, 
SimpleTableInfo tableInfo) {
+                return table;
+            }
+        };
+
         IcebergTransaction txn = getTxn();
         txn.updateIcebergCommitData(ctdList);
-        txn.beginInsert(dbName, tbWithPartition);
-        txn.finishInsert();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, 
tbWithPartition);
+        txn.beginInsert(tableInfo);
+        txn.finishInsert(tableInfo, Optional.empty());
         txn.commit();
-        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithPartition));
-        checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
 
+        checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
         checkPushDownByPartitionForTs(table, "ts1");
         checkPushDownByPartitionForTs(table, "ts2");
         checkPushDownByPartitionForTs(table, "ts3");
@@ -189,7 +216,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartitionForBucketInt(table, "int1");
     }
 
-    public void checkPushDownByPartitionForBucketInt(Table table, String 
column) {
+    private void checkPushDownByPartitionForBucketInt(Table table, String 
column) {
         // (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0
         Integer i1 = 15;
 
@@ -212,12 +239,12 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan2, 2);
     }
 
-    public void checkPushDownByPartitionForString(Table table, String column) {
+    private void checkPushDownByPartitionForString(Table table, String column) 
{
         // Since the string used to create the partition is in date format, 
the date check can be reused directly
         checkPushDownByPartitionForDt(table, column);
     }
 
-    public void checkPushDownByPartitionForTs(Table table, String column) {
+    private void checkPushDownByPartitionForTs(Table table, String column) {
         String lessTs = "2023-12-11T12:34:56.123456";
         String eqTs = "2024-12-11T12:34:56.123456";
         String greaterTs = "2025-12-11T12:34:56.123456";
@@ -230,7 +257,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan, 0);
     }
 
-    public void checkPushDownByPartitionForDt(Table table, String column) {
+    private void checkPushDownByPartitionForDt(Table table, String column) {
         String less = "2023-12-11";
         String eq = "2024-12-11";
         String greater = "2025-12-11";
@@ -243,7 +270,7 @@ public class IcebergTransactionTest {
         checkPushDownByPartition(table, greaterThan, 0);
     }
 
-    public void checkPushDownByPartition(Table table, Expression expr, Integer 
expectFiles) {
+    private void checkPushDownByPartition(Table table, Expression expr, 
Integer expectFiles) {
         CloseableIterable<FileScanTask> fileScanTasks = 
table.newScan().filter(expr).planFiles();
         AtomicReference<Integer> cnt = new AtomicReference<>(0);
         fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1));
@@ -268,45 +295,64 @@ public class IcebergTransactionTest {
         ctdList.add(ctd1);
         ctdList.add(ctd2);
 
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
+        new MockUp<IcebergUtils>() {
+            @Mock
+            public Table getRemoteTable(ExternalCatalog catalog, 
SimpleTableInfo tableInfo) {
+                return table;
+            }
+        };
+
         IcebergTransaction txn = getTxn();
         txn.updateIcebergCommitData(ctdList);
-        txn.beginInsert(dbName, tbWithoutPartition);
-        txn.finishInsert();
+        SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, 
tbWithPartition);
+        txn.beginInsert(tableInfo);
+        txn.finishInsert(tableInfo, Optional.empty());
         txn.commit();
 
-        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
         checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", 
"6");
     }
 
-    public void checkSnapshotProperties(Map<String, String> props,
-                                        String addRecords,
-                                        String addFileCnt,
-                                        String addFileSize) {
+    private IcebergTransaction getTxn() {
+        return new IcebergTransaction(ops);
+    }
+
+    private void checkSnapshotProperties(Map<String, String> props,
+            String addRecords,
+            String addFileCnt,
+            String addFileSize) {
         Assert.assertEquals(addRecords, props.get("added-records"));
         Assert.assertEquals(addFileCnt, props.get("added-data-files"));
         Assert.assertEquals(addFileSize, props.get("added-files-size"));
     }
 
-    public String numToYear(Integer num) {
+    private String numToYear(Integer num) {
         Transform<Object, Integer> year = Transforms.year();
         return year.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToMonth(Integer num) {
+    private String numToMonth(Integer num) {
         Transform<Object, Integer> month = Transforms.month();
         return month.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToDay(Integer num) {
+    private String numToDay(Integer num) {
         Transform<Object, Integer> day = Transforms.day();
         return day.toHumanString(Types.IntegerType.get(), num);
     }
 
-    public String numToHour(Integer num) {
+    private String numToHour(Integer num) {
         Transform<Object, Integer> hour = Transforms.hour();
         return hour.toHumanString(Types.IntegerType.get(), num);
     }
 
+    @Test
+    public void tableCloneTest() {
+        Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, 
tbWithoutPartition));
+        Table cloneTable = (Table) SerializationUtils.clone((Serializable) 
table);
+        Assert.assertNotNull(cloneTable);
+    }
+
     @Test
     public void testTransform() {
         Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z");
@@ -322,7 +368,4 @@ public class IcebergTransactionTest {
         Assert.assertEquals("2024-12-11", numToDay(dt));
     }
 
-    public IcebergTransaction getTxn() {
-        return new IcebergTransaction(ops);
-    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to