This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new d8d9f0a0de4 [Feature-WIP](iceberg-writer) Implements iceberg partition transform. (#36289) d8d9f0a0de4 is described below commit d8d9f0a0de486eb29b1f31f9b9fdb35bf54cc8af Author: kang <35803862+ghkan...@users.noreply.github.com> AuthorDate: Sat Jun 22 20:54:15 2024 +0800 [Feature-WIP](iceberg-writer) Implements iceberg partition transform. (#36289) #31442 Added iceberg operator function to support direct entry into the lake by doris 1. Support insert into data to iceberg by appending hdfs files 2. Implement iceberg partition routing through partitionTransform 2.1) Serialize spec and schema data into json on the fe side and then deserialize on the be side to get the schema and partition information of iceberg table 2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other types of partition strategies through partitionTransform and template class 3. Transaction management through IcebergTransaction 3.1) After the be side file is written, report CommitData data to fe according to the partition granularity 3.2) After receiving CommitData data, fe submits metadata to iceberg in IcebergTransaction ### Future work - Add unit test for partition transform function. - Implement partition transform function with exchange sink turned on. - The partition transform function omits the processing of bigint type. --------- Co-authored-by: lik40 <li...@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- be/src/util/bit_util.h | 22 + .../sink/writer/iceberg/partition_transformers.cpp | 168 ++- .../sink/writer/iceberg/partition_transformers.h | 1275 +++++++++++++++++++- .../sink/writer/iceberg/viceberg_table_writer.cpp | 22 +- .../apache/doris/common/info/SimpleTableInfo.java | 66 + .../datasource/iceberg/IcebergMetadataCache.java | 19 +- .../datasource/iceberg/IcebergMetadataOps.java | 5 + .../datasource/iceberg/IcebergTransaction.java | 211 ++-- .../doris/datasource/iceberg/IcebergUtils.java | 64 +- .../iceberg/helper/IcebergWriterHelper.java | 91 ++ .../iceberg/source/IcebergApiSource.java | 2 +- .../iceberg/source/IcebergHMSSource.java | 4 +- .../datasource/statistics/CommonStatistics.java | 81 ++ .../commands/insert/IcebergInsertExecutor.java | 28 +- .../org/apache/doris/planner/IcebergTableSink.java | 2 +- .../transaction/IcebergTransactionManager.java | 7 +- .../datasource/iceberg/IcebergTransactionTest.java | 139 ++- 17 files changed, 1979 insertions(+), 227 deletions(-) diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h index 230134ade09..6934f45ef3e 100644 --- a/be/src/util/bit_util.h +++ b/be/src/util/bit_util.h @@ -98,6 +98,28 @@ public: return (v << n) >> n; } + template <typename T> + static std::string IntToByteBuffer(T input) { + std::string buffer; + T value = input; + for (int i = 0; i < sizeof(value); ++i) { + // Applies a mask for a byte range on the input. + char value_to_save = value & 0XFF; + buffer.push_back(value_to_save); + // Remove the just processed part from the input so that we can exit early if there + // is nothing left to process. + value >>= 8; + if (value == 0 && value_to_save >= 0) { + break; + } + if (value == -1 && value_to_save < 0) { + break; + } + } + std::reverse(buffer.begin(), buffer.end()); + return buffer; + } + // Returns ceil(log2(x)). // TODO: this could be faster if we use __builtin_clz. Fix this if this ever shows up // in a hot path. diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp index 0faebea6295..6b2a3305b9e 100644 --- a/be/src/vec/sink/writer/iceberg/partition_transformers.cpp +++ b/be/src/vec/sink/writer/iceberg/partition_transformers.cpp @@ -25,6 +25,9 @@ namespace doris { namespace vectorized { +const std::chrono::time_point<std::chrono::system_clock> PartitionColumnTransformUtils::EPOCH = + std::chrono::system_clock::from_time_t(0); + std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type) { auto& transform = field.transform(); @@ -33,23 +36,98 @@ std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( if (std::regex_match(transform, width_match, hasWidth)) { std::string name = width_match[1]; - //int parsed_width = std::stoi(width_match[2]); + int parsed_width = std::stoi(width_match[2]); if (name == "truncate") { switch (source_type.type) { + case TYPE_INT: { + return std::make_unique<IntegerTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_BIGINT: { + return std::make_unique<BigintTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + return std::make_unique<StringTruncatePartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DECIMALV2: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V2>>( + source_type, parsed_width); + } + case TYPE_DECIMAL32: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal32>>( + source_type, parsed_width); + } + case TYPE_DECIMAL64: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal64>>( + source_type, parsed_width); + } + case TYPE_DECIMAL128I: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal128V3>>( + source_type, parsed_width); + } + case TYPE_DECIMAL256: { + return std::make_unique<DecimalTruncatePartitionColumnTransform<Decimal256>>( + source_type, parsed_width); + } default: { - throw doris::Exception( - doris::ErrorCode::INTERNAL_ERROR, - "Unsupported type for truncate partition column transform {}", - source_type.debug_string()); + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } } else if (name == "bucket") { switch (source_type.type) { + case TYPE_INT: { + return std::make_unique<IntBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_BIGINT: { + return std::make_unique<BigintBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_VARCHAR: + case TYPE_CHAR: + case TYPE_STRING: { + return std::make_unique<StringBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DATEV2: { + return std::make_unique<DateBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampBucketPartitionColumnTransform>(source_type, + parsed_width); + } + case TYPE_DECIMALV2: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V2>>( + source_type, parsed_width); + } + case TYPE_DECIMAL32: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal32>>( + source_type, parsed_width); + } + case TYPE_DECIMAL64: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal64>>( + source_type, parsed_width); + } + case TYPE_DECIMAL128I: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal128V3>>( + source_type, parsed_width); + } + case TYPE_DECIMAL256: { + return std::make_unique<DecimalBucketPartitionColumnTransform<Decimal256>>( + source_type, parsed_width); + } default: { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported type for bucket partition column transform {}", - source_type.debug_string()); + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } } @@ -57,14 +135,79 @@ std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( if (transform == "identity") { return std::make_unique<IdentityPartitionColumnTransform>(source_type); + } else if (transform == "year") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateYearPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampYearPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "month") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateMonthPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampMonthPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "day") { + switch (source_type.type) { + case TYPE_DATEV2: { + return std::make_unique<DateDayPartitionColumnTransform>(source_type); + } + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampDayPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "hour") { + switch (source_type.type) { + case TYPE_DATETIMEV2: { + return std::make_unique<TimestampHourPartitionColumnTransform>(source_type); + } + default: { + throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); + } + } + } else if (transform == "void") { + return std::make_unique<VoidPartitionColumnTransform>(source_type); } else { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported partition column transform: {}.", transform); + "Unsupported type {} for partition column transform {}", + source_type.debug_string(), transform); } } +std::string PartitionColumnTransform::name() const { + return "default"; +} + std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type, const std::any& value) const { + return get_partition_value(type, value); +} + +std::string PartitionColumnTransform::get_partition_value(const TypeDescriptor& type, + const std::any& value) const { if (value.has_value()) { switch (type.type) { case TYPE_BOOLEAN: { @@ -131,19 +274,12 @@ std::string PartitionColumnTransform::to_human_string(const TypeDescriptor& type } default: { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, - "Unsupported partition column transform: {}", - type.debug_string()); + "Unsupported type {} for partition", type.debug_string()); } } } return "null"; } -ColumnWithTypeAndName IdentityPartitionColumnTransform::apply(Block& block, int idx) { - const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(idx); - return {column_with_type_and_name.column, column_with_type_and_name.type, - column_with_type_and_name.name}; -} - } // namespace vectorized } // namespace doris diff --git a/be/src/vec/sink/writer/iceberg/partition_transformers.h b/be/src/vec/sink/writer/iceberg/partition_transformers.h index bfa2c43d2e5..7db8bfb1886 100644 --- a/be/src/vec/sink/writer/iceberg/partition_transformers.h +++ b/be/src/vec/sink/writer/iceberg/partition_transformers.h @@ -43,25 +43,81 @@ public: const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type); }; +class PartitionColumnTransformUtils { +public: + static DateV2Value<DateV2ValueType>& epoch_date() { + static DateV2Value<DateV2ValueType> epoch_date; + static bool initialized = false; + if (!initialized) { + epoch_date.from_date_str("1970-01-01 00:00:00", 19); + initialized = true; + } + return epoch_date; + } + + static DateV2Value<DateTimeV2ValueType>& epoch_datetime() { + static DateV2Value<DateTimeV2ValueType> epoch_datetime; + static bool initialized = false; + if (!initialized) { + epoch_datetime.from_date_str("1970-01-01 00:00:00", 19); + initialized = true; + } + return epoch_datetime; + } + + static std::string human_year(int year_ordinal) { + auto year = std::chrono::year_month_day( + std::chrono::sys_days(std::chrono::floor<std::chrono::days>( + EPOCH + std::chrono::years(year_ordinal)))) + .year(); + return std::to_string(static_cast<int>(year)); + } + + static std::string human_month(int month_ordinal) { + auto ymd = std::chrono::year_month_day(std::chrono::sys_days( + std::chrono::floor<std::chrono::days>(EPOCH + std::chrono::months(month_ordinal)))); + return fmt::format("{:04d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month())); + } + + static std::string human_day(int day_ordinal) { + auto ymd = std::chrono::year_month_day(std::chrono::sys_days( + std::chrono::floor<std::chrono::days>(EPOCH + std::chrono::days(day_ordinal)))); + return fmt::format("{:04d}-{:02d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month()), static_cast<unsigned>(ymd.day())); + } + + static std::string human_hour(int hour_ordinal) { + int day_value = hour_ordinal / 24; + int housr_value = hour_ordinal % 24; + auto ymd = std::chrono::year_month_day(std::chrono::sys_days( + std::chrono::floor<std::chrono::days>(EPOCH + std::chrono::days(day_value)))); + return fmt::format("{:04d}-{:02d}-{:02d}-{:02d}", static_cast<int>(ymd.year()), + static_cast<unsigned>(ymd.month()), static_cast<unsigned>(ymd.day()), + housr_value); + } + +private: + static const std::chrono::time_point<std::chrono::system_clock> EPOCH; + PartitionColumnTransformUtils() = default; +}; + class PartitionColumnTransform { public: PartitionColumnTransform() = default; virtual ~PartitionColumnTransform() = default; - virtual bool preserves_non_null() const { return false; } - - virtual bool monotonic() const { return true; } - - virtual bool temporal() const { return false; } + virtual std::string name() const; virtual const TypeDescriptor& get_result_type() const = 0; - virtual bool is_void() const { return false; } - - virtual ColumnWithTypeAndName apply(Block& block, int idx) = 0; + virtual ColumnWithTypeAndName apply(Block& block, int column_pos) = 0; virtual std::string to_human_string(const TypeDescriptor& type, const std::any& value) const; + + virtual std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const; }; class IdentityPartitionColumnTransform : public PartitionColumnTransform { @@ -69,12 +125,1211 @@ public: IdentityPartitionColumnTransform(const TypeDescriptor& source_type) : _source_type(source_type) {} - virtual const TypeDescriptor& get_result_type() const { return _source_type; } + std::string name() const override { return "Identity"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + return {column_with_type_and_name.column, column_with_type_and_name.type, + column_with_type_and_name.name}; + } + +private: + TypeDescriptor _source_type; +}; + +class StringTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + StringTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "StringTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + auto int_type = std::make_shared<DataTypeInt32>(); + size_t num_columns_without_result = block.columns(); + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + + ColumnPtr string_column_ptr; + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + string_column_ptr = nullable_column->get_nested_column_ptr(); + is_nullable = true; + } else { + string_column_ptr = column_with_type_and_name.column; + is_nullable = false; + } + block.replace_by_position(column_pos, std::move(string_column_ptr)); + block.insert( + {int_type->create_column_const(block.rows(), to_field(1)), int_type, "const 1"}); + block.insert({int_type->create_column_const(block.rows(), to_field(_width)), int_type, + fmt::format("const {}", _width)}); + block.insert({nullptr, std::make_shared<DataTypeString>(), "result"}); + ColumnNumbers temp_arguments(3); + temp_arguments[0] = column_pos; // str column + temp_arguments[1] = num_columns_without_result; // pos + temp_arguments[2] = num_columns_without_result + 1; // width + size_t result_column_id = num_columns_without_result + 2; + + SubstringUtil::substring_execute(block, temp_arguments, result_column_id, block.rows()); + if (is_nullable) { + auto res_column = ColumnNullable::create(block.get_by_position(result_column_id).column, + null_map_column_ptr); + Block::erase_useless_column(&block, num_columns_without_result); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + auto res_column = block.get_by_position(result_column_id).column; + Block::erase_useless_column(&block, num_columns_without_result); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class IntegerTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + IntegerTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "IntegerTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt32*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const int* end_in = in_data.data() + in_data.size(); + const Int32* __restrict p_in = in_data.data(); + Int32* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + *p_out = *p_in - ((*p_in % _width) + _width) % _width; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class BigintTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + BigintTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "BigintTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt64*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt64::create(); + ColumnInt64::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const Int64* end_in = in_data.data() + in_data.size(); + const Int64* __restrict p_in = in_data.data(); + Int64* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + *p_out = *p_in - ((*p_in % _width) + _width) % _width; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +template <typename T> +class DecimalTruncatePartitionColumnTransform : public PartitionColumnTransform { +public: + DecimalTruncatePartitionColumnTransform(const TypeDescriptor& source_type, int width) + : _source_type(source_type), _width(width) {} + + std::string name() const override { return "DecimalTruncate"; } + + const TypeDescriptor& get_result_type() const override { return _source_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + + ColumnPtr column_ptr; + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + is_nullable = true; + } else { + column_ptr = column_with_type_and_name.column; + is_nullable = false; + } + + const auto* const decimal_col = check_and_get_column<ColumnDecimal<T>>(column_ptr); + const auto& vec_src = decimal_col->get_data(); + + auto col_res = ColumnDecimal<T>::create(vec_src.size(), decimal_col->get_scale()); + auto& vec_res = col_res->get_data(); + + const typename T::NativeType* __restrict p_in = + reinterpret_cast<const T::NativeType*>(vec_src.data()); + const typename T::NativeType* end_in = + reinterpret_cast<const T::NativeType*>(vec_src.data()) + vec_src.size(); + typename T::NativeType* __restrict p_out = reinterpret_cast<T::NativeType*>(vec_res.data()); + + while (p_in < end_in) { + typename T::NativeType remainder = ((*p_in % _width) + _width) % _width; + *p_out = *p_in - remainder; + ++p_in; + ++p_out; + } + + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {res_column, + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _width; +}; + +class IntBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + IntBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "IntBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt32*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const int* end_in = in_data.data() + in_data.size(); + const Int32* __restrict p_in = in_data.data(); + Int32* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + Int64 long_value = static_cast<Int64>(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class BigintBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + BigintBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "BigintBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnInt64*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt64::create(); + ColumnInt64::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const Int64* end_in = in_data.data() + in_data.size(); + const Int64* __restrict p_in = in_data.data(); + Int64* __restrict p_out = out_data.data(); + while (p_in < end_in) { + Int64 long_value = static_cast<Int64>(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + int value = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + *p_out = value; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +template <typename T> +class DecimalBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + DecimalBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "DecimalBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDecimal<T>*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + auto& vec_res = col_res->get_data(); + + const typename T::NativeType* __restrict p_in = + reinterpret_cast<const T::NativeType*>(in_data.data()); + const typename T::NativeType* end_in = + reinterpret_cast<const T::NativeType*>(in_data.data()) + in_data.size(); + typename T::NativeType* __restrict p_out = reinterpret_cast<T::NativeType*>(vec_res.data()); + + while (p_in < end_in) { + std::string buffer = BitUtil::IntToByteBuffer(*p_in); + uint32_t hash_value = HashUtil::murmur_hash3_32(buffer.data(), buffer.size(), 0); + *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + return std::to_string(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class DateBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + DateBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const auto* end_in = in_data.data() + in_data.size(); + + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + + int32_t days_from_unix_epoch = value.daynr() - 719528; + Int64 long_value = static_cast<Int64>(days_from_unix_epoch); + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + + *p_out = ((hash_value >> 1) & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class TimestampBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + const auto* end_in = in_data.data() + in_data.size(); + + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + + int64_t timestamp; + if (!value.unix_timestamp(×tamp, "UTC")) { + LOG(WARNING) << "Failed to call unix_timestamp :" << value.debug_string(); + timestamp = 0; + } + Int64 long_value = static_cast<Int64>(timestamp) * 1000000; + uint32_t hash_value = HashUtil::murmur_hash3_32(&long_value, sizeof(long_value), 0); + + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return std::to_string(std::any_cast<Int32>(value)); + ; + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class StringBucketPartitionColumnTransform : public PartitionColumnTransform { +public: + StringBucketPartitionColumnTransform(const TypeDescriptor& source_type, int bucket_num) + : _source_type(source_type), _bucket_num(bucket_num), _target_type(TYPE_INT) {} + + std::string name() const override { return "StringBucket"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto* str_col = assert_cast<const ColumnString*>(column_ptr.get()); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + const auto& data = str_col->get_chars(); + const auto& offsets = str_col->get_offsets(); + + size_t offset_size = offsets.size(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(offset_size); + auto* __restrict p_out = out_data.data(); + + for (int i = 0; i < offset_size; i++) { + const unsigned char* raw_str = &data[offsets[i - 1]]; + ColumnString::Offset size = offsets[i] - offsets[i - 1]; + uint32_t hash_value = HashUtil::murmur_hash3_32(raw_str, size, 0); + + *p_out = (hash_value & INT32_MAX) % _bucket_num; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + +private: + TypeDescriptor _source_type; + int _bucket_num; + TypeDescriptor _target_type; +}; + +class DateYearPartitionColumnTransform : public PartitionColumnTransform { +public: + DateYearPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateYear"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampYearPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampYearPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampYear"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<YEAR>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_year(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class DateMonthPartitionColumnTransform : public PartitionColumnTransform { +public: + DateMonthPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateMonth"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampMonthPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampMonthPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampMonth"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<MONTH>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_month(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class DateDayPartitionColumnTransform : public PartitionColumnTransform { +public: + DateDayPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "DateDay"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateV2ValueType> value = + binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); + *p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_date(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + int day_value = std::any_cast<Int32>(value); + return PartitionColumnTransformUtils::human_day(day_value); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampDayPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampDayPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampDay"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<DAY>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + return get_partition_value(type, value); + } + + std::string get_partition_value(const TypeDescriptor& type, + const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_day(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class TimestampHourPartitionColumnTransform : public PartitionColumnTransform { +public: + TimestampHourPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(TYPE_INT) {} + + std::string name() const override { return "TimestampHour"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + //1) get the target column ptr + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); + ColumnPtr column_ptr = column_with_type_and_name.column->convert_to_full_column_if_const(); + CHECK(column_ptr != nullptr); + + //2) get the input data from block + ColumnPtr null_map_column_ptr; + bool is_nullable = false; + if (column_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get()); + is_nullable = true; + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } + const auto& in_data = assert_cast<const ColumnDateTimeV2*>(column_ptr.get())->get_data(); + + //3) do partition routing + auto col_res = ColumnInt32::create(); + ColumnInt32::Container& out_data = col_res->get_data(); + out_data.resize(in_data.size()); + + const auto* end_in = in_data.data() + in_data.size(); + const auto* __restrict p_in = in_data.data(); + auto* __restrict p_out = out_data.data(); + + while (p_in < end_in) { + DateV2Value<DateTimeV2ValueType> value = + binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(UInt64*)p_in); + *p_out = datetime_diff<HOUR>(PartitionColumnTransformUtils::epoch_datetime(), value); + ++p_in; + ++p_out; + } + + //4) create the partition column and return + if (is_nullable) { + auto res_column = ColumnNullable::create(std::move(col_res), null_map_column_ptr); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } else { + return {std::move(col_res), + DataTypeFactory::instance().create_data_type(get_result_type(), false), + column_with_type_and_name.name}; + } + } + + std::string to_human_string(const TypeDescriptor& type, const std::any& value) const override { + if (value.has_value()) { + return PartitionColumnTransformUtils::human_hour(std::any_cast<Int32>(value)); + } else { + return "null"; + } + } + +private: + TypeDescriptor _source_type; + TypeDescriptor _target_type; +}; + +class VoidPartitionColumnTransform : public PartitionColumnTransform { +public: + VoidPartitionColumnTransform(const TypeDescriptor& source_type) + : _source_type(source_type), _target_type(source_type) {} + + std::string name() const override { return "Void"; } + + const TypeDescriptor& get_result_type() const override { return _target_type; } + + ColumnWithTypeAndName apply(Block& block, int column_pos) override { + const ColumnWithTypeAndName& column_with_type_and_name = block.get_by_position(column_pos); - virtual ColumnWithTypeAndName apply(Block& block, int idx); + ColumnPtr column_ptr; + ColumnPtr null_map_column_ptr; + if (auto* nullable_column = + check_and_get_column<ColumnNullable>(column_with_type_and_name.column)) { + null_map_column_ptr = nullable_column->get_null_map_column_ptr(); + column_ptr = nullable_column->get_nested_column_ptr(); + } else { + column_ptr = column_with_type_and_name.column; + } + auto res_column = ColumnNullable::create(std::move(column_ptr), + ColumnUInt8::create(column_ptr->size(), 1)); + return {std::move(res_column), + DataTypeFactory::instance().create_data_type(get_result_type(), true), + column_with_type_and_name.name}; + } private: TypeDescriptor _source_type; + TypeDescriptor _target_type; }; } // namespace vectorized diff --git a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp index 2703330406c..e59b0593f7b 100644 --- a/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp +++ b/be/src/vec/sink/writer/iceberg/viceberg_table_writer.cpp @@ -329,8 +329,8 @@ std::vector<std::string> VIcebergTableWriter::_partition_values( TypeDescriptor result_type = iceberg_partition_column.partition_column_transform().get_result_type(); partition_values.emplace_back( - iceberg_partition_column.partition_column_transform().to_human_string(result_type, - data.get(i))); + iceberg_partition_column.partition_column_transform().get_partition_value( + result_type, data.get(i))); } return partition_values; @@ -407,21 +407,25 @@ std::optional<PartitionData> VIcebergTableWriter::_get_partition_data( std::any VIcebergTableWriter::_get_iceberg_partition_value( const TypeDescriptor& type_desc, const ColumnWithTypeAndName& partition_column, int position) { - ColumnPtr column; - if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) { + //1) get the partition column ptr + ColumnPtr col_ptr = partition_column.column->convert_to_full_column_if_const(); + CHECK(col_ptr != nullptr); + if (col_ptr->is_nullable()) { + const ColumnNullable* nullable_column = + reinterpret_cast<const vectorized::ColumnNullable*>(col_ptr.get()); auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); if (null_map_data[position]) { return std::any(); } - column = nullable_column->get_nested_column_ptr(); - } else { - column = partition_column.column; + col_ptr = nullable_column->get_nested_column_ptr(); } - auto [item, size] = column->get_data_at(position); + + //2) get parition field data from paritionblock + auto [item, size] = col_ptr->get_data_at(position); switch (type_desc.type) { case TYPE_BOOLEAN: { vectorized::Field field = - vectorized::check_and_get_column<const ColumnUInt8>(*column)->operator[](position); + vectorized::check_and_get_column<const ColumnUInt8>(*col_ptr)->operator[](position); return field.get<bool>(); } case TYPE_TINYINT: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java new file mode 100644 index 00000000000..6fdb27e1d0b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/info/SimpleTableInfo.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/AnalysisException.java +// and modified by Doris + +package org.apache.doris.common.info; + +import java.util.Objects; + +public class SimpleTableInfo { + + private final String dbName; + private final String tbName; + + public SimpleTableInfo(String dbName, String tbName) { + this.dbName = dbName; + this.tbName = tbName; + } + + public String getDbName() { + return dbName; + } + + public String getTbName() { + return tbName; + } + + @Override + public int hashCode() { + return Objects.hash(dbName, tbName); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + SimpleTableInfo that = (SimpleTableInfo) other; + return Objects.equals(dbName, that.dbName) && Objects.equals(tbName, that.tbName); + } + + @Override + public String toString() { + return String.format("%s.%s", dbName, tbName); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java index acda08b7378..68064c4e439 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java @@ -33,6 +33,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -85,6 +86,20 @@ public class IcebergMetadataCache { return tableCache.get(key); } + public Table getAndCloneTable(CatalogIf catalog, String dbName, String tbName) { + Table restTable; + synchronized (this) { + Table table = getIcebergTable(catalog, dbName, tbName); + restTable = SerializableTable.copyOf(table); + } + return restTable; + } + + public Table getRemoteTable(CatalogIf catalog, String dbName, String tbName) { + IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, dbName, tbName); + return loadTable(key); + } + @NotNull private List<Snapshot> loadSnapshots(IcebergMetadataCacheKey key) { Table icebergTable = getIcebergTable(key.catalog, key.dbName, key.tableName); @@ -116,7 +131,7 @@ public class IcebergMetadataCache { public void invalidateCatalogCache(long catalogId) { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId) - .forEach(snapshotListCache::invalidate); + .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> entry.getKey().catalog.getId() == catalogId) @@ -130,7 +145,7 @@ public class IcebergMetadataCache { snapshotListCache.asMap().keySet().stream() .filter(key -> key.catalog.getId() == catalogId && key.dbName.equals(dbName) && key.tableName.equals( tblName)) - .forEach(snapshotListCache::invalidate); + .forEach(snapshotListCache::invalidate); tableCache.asMap().entrySet().stream() .filter(entry -> { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index 7161f48680a..7bd8409ca2c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -64,6 +64,10 @@ public class IcebergMetadataOps implements ExternalMetadataOps { return catalog; } + public IcebergExternalCatalog getExternalCatalog() { + return dorisCatalog; + } + @Override public void close() { } @@ -173,4 +177,5 @@ public class IcebergMetadataOps implements ExternalMetadataOps { catalog.dropTable(TableIdentifier.of(dbName, tableName)); db.setUnInitialized(true); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java index 5025e075142..a3a978ccd7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergTransaction.java @@ -21,31 +21,39 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TFileContent; +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.iceberg.helper.IcebergWriterHelper; +import org.apache.doris.nereids.trees.plans.commands.insert.BaseExternalTableInsertCommandContext; +import org.apache.doris.nereids.trees.plans.commands.insert.InsertCommandContext; import org.apache.doris.thrift.TIcebergCommitData; +import org.apache.doris.thrift.TUpdateMode; import org.apache.doris.transaction.Transaction; -import com.google.common.base.VerifyException; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFiles; -import org.apache.iceberg.FileContent; -import org.apache.iceberg.Metrics; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.ReplacePartitions; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.WriteResult; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; public class IcebergTransaction implements Transaction { private static final Logger LOG = LogManager.getLogger(IcebergTransaction.class); + private final IcebergMetadataOps ops; + private SimpleTableInfo tableInfo; + private Table table; + + private org.apache.iceberg.Transaction transaction; private final List<TIcebergCommitData> commitDataList = Lists.newArrayList(); @@ -59,140 +67,123 @@ public class IcebergTransaction implements Transaction { } } - public void beginInsert(String dbName, String tbName) { - Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); - transaction = icebergTable.newTransaction(); + public void beginInsert(SimpleTableInfo tableInfo) { + this.tableInfo = tableInfo; + this.table = getNativeTable(tableInfo); + this.transaction = table.newTransaction(); } - public void finishInsert() { - Table icebergTable = transaction.table(); - AppendFiles appendFiles = transaction.newAppend(); - - for (CommitTaskData task : convertToCommitTaskData()) { - DataFiles.Builder builder = DataFiles.builder(icebergTable.spec()) - .withPath(task.getPath()) - .withFileSizeInBytes(task.getFileSizeInBytes()) - .withFormat(IcebergUtils.getFileFormat(icebergTable)) - .withMetrics(task.getMetrics()); - - if (icebergTable.spec().isPartitioned()) { - List<String> partitionValues = task.getPartitionValues() - .orElseThrow(() -> new VerifyException("No partition data for partitioned table")); - builder.withPartitionValues(partitionValues); - } - appendFiles.appendFile(builder.build()); + public void finishInsert(SimpleTableInfo tableInfo, Optional<InsertCommandContext> insertCtx) { + if (LOG.isDebugEnabled()) { + LOG.info("iceberg table {} insert table finished!", tableInfo); } - // in appendFiles.commit, it will generate metadata(manifest and snapshot) - // after appendFiles.commit, in current transaction, you can already see the new snapshot - appendFiles.commit(); - } - - public List<CommitTaskData> convertToCommitTaskData() { - List<CommitTaskData> commitTaskData = new ArrayList<>(); - for (TIcebergCommitData data : this.commitDataList) { - commitTaskData.add(new CommitTaskData( - data.getFilePath(), - data.getFileSize(), - new Metrics( - data.getRowCount(), - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP, - Collections.EMPTY_MAP - ), - data.isSetPartitionValues() ? Optional.of(data.getPartitionValues()) : Optional.empty(), - convertToFileContent(data.getFileContent()), - data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() - )); + //create and start the iceberg transaction + TUpdateMode updateMode = TUpdateMode.APPEND; + if (insertCtx.isPresent()) { + updateMode = ((BaseExternalTableInsertCommandContext) insertCtx.get()).isOverwrite() ? TUpdateMode.OVERWRITE + : TUpdateMode.APPEND; } - return commitTaskData; + updateManifestAfterInsert(updateMode); } - private FileContent convertToFileContent(TFileContent content) { - if (content.equals(TFileContent.DATA)) { - return FileContent.DATA; - } else if (content.equals(TFileContent.POSITION_DELETES)) { - return FileContent.POSITION_DELETES; + private void updateManifestAfterInsert(TUpdateMode updateMode) { + PartitionSpec spec = table.spec(); + FileFormat fileFormat = IcebergUtils.getFileFormat(table); + + //convert commitDataList to writeResult + WriteResult writeResult = IcebergWriterHelper + .convertToWriterResult(fileFormat, spec, commitDataList); + List<WriteResult> pendingResults = Lists.newArrayList(writeResult); + + if (spec.isPartitioned()) { + partitionManifestUpdate(updateMode, table, pendingResults); + if (LOG.isDebugEnabled()) { + LOG.info("{} {} table partition manifest successful and writeResult : {}..", tableInfo, updateMode, + writeResult); + } } else { - return FileContent.EQUALITY_DELETES; + tableManifestUpdate(updateMode, table, pendingResults); + if (LOG.isDebugEnabled()) { + LOG.info("{} {} table manifest successful and writeResult : {}..", tableInfo, updateMode, + writeResult); + } } } @Override public void commit() throws UserException { - // Externally readable - // Manipulate the relevant data so that others can also see the latest table, such as: - // 1. hadoop: it will change the version number information in 'version-hint.text' - // 2. hive: it will change the table properties, the most important thing is to revise 'metadata_location' - // 3. and so on ... + // commit the iceberg transaction transaction.commitTransaction(); } @Override public void rollback() { - + //do nothing } public long getUpdateCnt() { return commitDataList.stream().mapToLong(TIcebergCommitData::getRowCount).sum(); } - public static class CommitTaskData { - private final String path; - private final long fileSizeInBytes; - private final Metrics metrics; - private final Optional<List<String>> partitionValues; - private final FileContent content; - private final Optional<List<String>> referencedDataFiles; - - public CommitTaskData(String path, - long fileSizeInBytes, - Metrics metrics, - Optional<List<String>> partitionValues, - FileContent content, - Optional<List<String>> referencedDataFiles) { - this.path = path; - this.fileSizeInBytes = fileSizeInBytes; - this.metrics = metrics; - this.partitionValues = convertPartitionValuesForNull(partitionValues); - this.content = content; - this.referencedDataFiles = referencedDataFiles; - } - private Optional<List<String>> convertPartitionValuesForNull(Optional<List<String>> partitionValues) { - if (!partitionValues.isPresent()) { - return partitionValues; - } - List<String> values = partitionValues.get(); - if (!values.contains("null")) { - return partitionValues; - } - return Optional.of(values.stream().map(s -> s.equals("null") ? null : s).collect(Collectors.toList())); - } + private synchronized Table getNativeTable(SimpleTableInfo tableInfo) { + Objects.requireNonNull(tableInfo); + IcebergExternalCatalog externalCatalog = ops.getExternalCatalog(); + return IcebergUtils.getRemoteTable(externalCatalog, tableInfo); + } - public String getPath() { - return path; + private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { + if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) { + LOG.warn("{} partitionManifestUp method call but pendingResults is null or empty!", table.name()); + return; } - - public long getFileSizeInBytes() { - return fileSizeInBytes; + // Commit the appendPartitionOperator transaction. + if (updateMode == TUpdateMode.APPEND) { + commitAppendTxn(table, pendingResults); + } else { + ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files."); + Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); + } + appendPartitionOp.commit(); } + } - public Metrics getMetrics() { - return metrics; + private void tableManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { + if (Objects.isNull(pendingResults) || pendingResults.isEmpty()) { + LOG.warn("{} tableManifestUp method call but pendingResults is null or empty!", table.name()); + return; } - - public Optional<List<String>> getPartitionValues() { - return partitionValues; + // Commit the appendPartitionOperator transaction. + if (LOG.isDebugEnabled()) { + LOG.info("{} tableManifestUp method call ", table.name()); } - - public FileContent getContent() { - return content; + if (updateMode == TUpdateMode.APPEND) { + commitAppendTxn(table, pendingResults); + } else { + ReplacePartitions appendPartitionOp = table.newReplacePartitions(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files."); + Arrays.stream(result.dataFiles()).forEach(appendPartitionOp::addFile); + } + appendPartitionOp.commit(); } + } - public Optional<List<String>> getReferencedDataFiles() { - return referencedDataFiles; + + private void commitAppendTxn(Table table, List<WriteResult> pendingResults) { + // To be compatible with iceberg format V1. + AppendFiles appendFiles = table.newAppend(); + for (WriteResult result : pendingResults) { + Preconditions.checkState(result.referencedDataFiles().length == 0, + "Should have no referenced data files for append."); + Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile); } + appendFiles.commit(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java index 2aa5dda35a4..512e6a3ee93 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java @@ -43,6 +43,7 @@ import org.apache.doris.catalog.StructField; import org.apache.doris.catalog.StructType; import org.apache.doris.catalog.Type; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.hive.HiveMetaStoreClientHelper; @@ -50,6 +51,7 @@ import org.apache.doris.nereids.exceptions.NotSupportedException; import org.apache.doris.thrift.TExprOpcode; import com.google.common.collect.Lists; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; @@ -87,6 +89,8 @@ public class IcebergUtils { // https://iceberg.apache.org/spec/#schemas-and-data-types // All time and timestamp values are stored with microsecond precision private static final int ICEBERG_DATETIME_SCALE_MS = 6; + private static final String PARQUET_NAME = "parquet"; + private static final String ORC_NAME = "orc"; public static final String TOTAL_RECORDS = "total-records"; public static final String TOTAL_POSITION_DELETES = "total-position-deletes"; @@ -522,8 +526,8 @@ public class IcebergUtils { case MAP: Types.MapType map = (Types.MapType) type; return new MapType( - icebergTypeToDorisType(map.keyType()), - icebergTypeToDorisType(map.valueType()) + icebergTypeToDorisType(map.keyType()), + icebergTypeToDorisType(map.valueType()) ); case STRUCT: Types.StructType struct = (Types.StructType) type; @@ -536,11 +540,30 @@ public class IcebergUtils { } } + public static org.apache.iceberg.Table getIcebergTable(ExternalCatalog catalog, String dbName, String tblName) { + return getIcebergTableInternal(catalog, dbName, tblName, false); + } + + public static org.apache.iceberg.Table getAndCloneTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return getIcebergTableInternal(catalog, tableInfo.getDbName(), tableInfo.getTbName(), true); + } + + public static org.apache.iceberg.Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { return Env.getCurrentEnv() .getExtMetaCacheMgr() .getIcebergMetadataCache() - .getIcebergTable(catalog, dbName, tblName); + .getRemoteTable(catalog, tableInfo.getDbName(), tableInfo.getTbName()); + } + + private static org.apache.iceberg.Table getIcebergTableInternal(ExternalCatalog catalog, String dbName, + String tblName, + boolean isClone) { + IcebergMetadataCache metadataCache = Env.getCurrentEnv() + .getExtMetaCacheMgr() + .getIcebergMetadataCache(); + return isClone ? metadataCache.getAndCloneTable(catalog, dbName, tblName) + : metadataCache.getIcebergTable(catalog, dbName, tblName); } /** @@ -587,17 +610,27 @@ public class IcebergUtils { return -1; } - public static String getFileFormat(Table table) { - Map<String, String> properties = table.properties(); + + public static FileFormat getFileFormat(Table icebergTable) { + Map<String, String> properties = icebergTable.properties(); + String fileFormatName; if (properties.containsKey(WRITE_FORMAT)) { - return properties.get(WRITE_FORMAT); + fileFormatName = properties.get(WRITE_FORMAT); + } else { + fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, PARQUET_NAME); } - if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) { - return properties.get(TableProperties.DEFAULT_FILE_FORMAT); + FileFormat fileFormat; + if (fileFormatName.toLowerCase().contains(ORC_NAME)) { + fileFormat = FileFormat.ORC; + } else if (fileFormatName.toLowerCase().contains(PARQUET_NAME)) { + fileFormat = FileFormat.PARQUET; + } else { + throw new RuntimeException("Unsupported input format type: " + fileFormatName); } - return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; + return fileFormat; } + public static String getFileCompress(Table table) { Map<String, String> properties = table.properties(); if (properties.containsKey(COMPRESSION_CODEC)) { @@ -605,11 +638,11 @@ public class IcebergUtils { } else if (properties.containsKey(SPARK_SQL_COMPRESSION_CODEC)) { return properties.get(SPARK_SQL_COMPRESSION_CODEC); } - String fileFormat = getFileFormat(table); - if (fileFormat.equalsIgnoreCase("parquet")) { + FileFormat fileFormat = getFileFormat(table); + if (fileFormat == FileFormat.PARQUET) { return properties.getOrDefault( TableProperties.PARQUET_COMPRESSION, TableProperties.PARQUET_COMPRESSION_DEFAULT_SINCE_1_4_0); - } else if (fileFormat.equalsIgnoreCase("orc")) { + } else if (fileFormat == FileFormat.ORC) { return properties.getOrDefault( TableProperties.ORC_COMPRESSION, TableProperties.ORC_COMPRESSION_DEFAULT); } @@ -620,9 +653,10 @@ public class IcebergUtils { Map<String, String> properties = table.properties(); if (properties.containsKey(TableProperties.WRITE_LOCATION_PROVIDER_IMPL)) { throw new NotSupportedException( - "Table " + table.name() + " specifies " + properties.get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) - + " as a location provider. " - + "Writing to Iceberg tables with custom location provider is not supported."); + "Table " + table.name() + " specifies " + properties + .get(TableProperties.WRITE_LOCATION_PROVIDER_IMPL) + + " as a location provider. " + + "Writing to Iceberg tables with custom location provider is not supported."); } String dataLocation = properties.get(TableProperties.WRITE_DATA_LOCATION); if (dataLocation == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java new file mode 100644 index 00000000000..4171a0536f9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/helper/IcebergWriterHelper.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.helper; + +import org.apache.doris.datasource.statistics.CommonStatistics; +import org.apache.doris.thrift.TIcebergCommitData; + +import com.google.common.base.VerifyException; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.io.WriteResult; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +public class IcebergWriterHelper { + + private static final int DEFAULT_FILE_COUNT = 1; + + public static WriteResult convertToWriterResult( + FileFormat format, + PartitionSpec spec, + List<TIcebergCommitData> commitDataList) { + List<DataFile> dataFiles = new ArrayList<>(); + for (TIcebergCommitData commitData : commitDataList) { + //get the files path + String location = commitData.getFilePath(); + + //get the commit file statistics + long fileSize = commitData.getFileSize(); + long recordCount = commitData.getRowCount(); + CommonStatistics stat = new CommonStatistics(recordCount, DEFAULT_FILE_COUNT, fileSize); + + Optional<List<String>> partValues = Optional.empty(); + //get and check partitionValues when table is partitionedTable + if (spec.isPartitioned()) { + List<String> partitionValues = commitData.getPartitionValues(); + if (Objects.isNull(partitionValues) || partitionValues.isEmpty()) { + throw new VerifyException("No partition data for partitioned table"); + } + partitionValues = partitionValues.stream().map(s -> s.equals("null") ? null : s) + .collect(Collectors.toList()); + partValues = Optional.of(partitionValues); + } + DataFile dataFile = genDataFile(format, location, spec, partValues, stat); + dataFiles.add(dataFile); + } + return WriteResult.builder() + .addDataFiles(dataFiles) + .build(); + + } + + public static DataFile genDataFile( + FileFormat format, + String location, + PartitionSpec spec, + Optional<List<String>> partValues, + CommonStatistics statistics) { + + DataFiles.Builder builder = DataFiles.builder(spec) + .withPath(location) + .withFileSizeInBytes(statistics.getTotalFileBytes()) + .withRecordCount(statistics.getRowCount()) + .withFormat(format); + + partValues.ifPresent(builder::withPartitionValues); + + return builder.build(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java index e590e918344..56ff188f964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java @@ -61,7 +61,7 @@ public class IcebergApiSource implements IcebergSource { @Override public String getFileFormat() { - return IcebergUtils.getFileFormat(originTable); + return IcebergUtils.getFileFormat(originTable).name(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java index 06b785a15f8..5e9860171d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java @@ -41,7 +41,7 @@ public class IcebergHMSSource implements IcebergSource { private final org.apache.iceberg.Table icebergTable; public IcebergHMSSource(HMSExternalTable hmsTable, TupleDescriptor desc, - Map<String, ColumnRange> columnNameToRange) { + Map<String, ColumnRange> columnNameToRange) { this.hmsTable = hmsTable; this.desc = desc; this.columnNameToRange = columnNameToRange; @@ -58,7 +58,7 @@ public class IcebergHMSSource implements IcebergSource { @Override public String getFileFormat() throws DdlException, MetaNotFoundException { - return IcebergUtils.getFileFormat(icebergTable); + return IcebergUtils.getFileFormat(icebergTable).name(); } public org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java new file mode 100644 index 00000000000..9685dfdf35a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/statistics/CommonStatistics.java @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.statistics; + +public class CommonStatistics { + + public static final CommonStatistics EMPTY = new CommonStatistics(0L, 0L, 0L); + + private final long rowCount; + private final long fileCount; + private final long totalFileBytes; + + public CommonStatistics(long rowCount, long fileCount, long totalFileBytes) { + this.fileCount = fileCount; + this.rowCount = rowCount; + this.totalFileBytes = totalFileBytes; + } + + public long getRowCount() { + return rowCount; + } + + public long getFileCount() { + return fileCount; + } + + public long getTotalFileBytes() { + return totalFileBytes; + } + + public static CommonStatistics reduce( + CommonStatistics current, + CommonStatistics update, + ReduceOperator operator) { + return new CommonStatistics( + reduce(current.getRowCount(), update.getRowCount(), operator), + reduce(current.getFileCount(), update.getFileCount(), operator), + reduce(current.getTotalFileBytes(), update.getTotalFileBytes(), operator)); + } + + public static long reduce(long current, long update, ReduceOperator operator) { + if (current >= 0 && update >= 0) { + switch (operator) { + case ADD: + return current + update; + case SUBTRACT: + return current - update; + case MAX: + return Math.max(current, update); + case MIN: + return Math.min(current, update); + default: + throw new IllegalArgumentException("Unexpected operator: " + operator); + } + } + + return 0; + } + + public enum ReduceOperator { + ADD, + SUBTRACT, + MIN, + MAX, + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java index b19c483c9f3..86b1f1ef0b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/IcebergInsertExecutor.java @@ -18,6 +18,7 @@ package org.apache.doris.nereids.trees.plans.commands.insert; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.IcebergTransaction; import org.apache.doris.nereids.NereidsPlanner; @@ -39,9 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { * constructor */ public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table, - String labelName, NereidsPlanner planner, - Optional<InsertCommandContext> insertCtx, - boolean emptyInsert) { + String labelName, NereidsPlanner planner, + Optional<InsertCommandContext> insertCtx, + boolean emptyInsert) { super(ctx, table, labelName, planner, insertCtx, emptyInsert); } @@ -51,11 +52,23 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { coordinator.setIcebergCommitDataFunc(transaction::updateIcebergCommitData); } + @Override + protected void beforeExec() { + String dbName = ((IcebergExternalTable) table).getDbName(); + String tbName = table.getName(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); + IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); + transaction.beginInsert(tableInfo); + } + @Override protected void doBeforeCommit() throws UserException { + String dbName = ((IcebergExternalTable) table).getDbName(); + String tbName = table.getName(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); - loadedRows = transaction.getUpdateCnt(); - transaction.finishInsert(); + this.loadedRows = transaction.getUpdateCnt(); + transaction.finishInsert(tableInfo, insertCtx); } @Override @@ -63,9 +76,4 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor { return TransactionType.ICEBERG; } - @Override - protected void beforeExec() { - IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); - transaction.beginInsert(((IcebergExternalTable) table).getDbName(), table.getName()); - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java index 659be7cb1fe..0e01b599964 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergTableSink.java @@ -121,7 +121,7 @@ public class IcebergTableSink extends BaseExternalTableDataSink { } // file info - tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable))); + tSink.setFileFormat(getTFileFormatType(IcebergUtils.getFileFormat(icebergTable).name())); tSink.setCompressionType(getTFileCompressType(IcebergUtils.getFileCompress(icebergTable))); // hadoop config diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java index 3d6486f9391..f4b802aaa99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/IcebergTransactionManager.java @@ -17,6 +17,7 @@ package org.apache.doris.transaction; + import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.datasource.iceberg.IcebergMetadataOps; @@ -55,12 +56,12 @@ public class IcebergTransactionManager implements TransactionManager { } @Override - public Transaction getTransaction(long id) { + public IcebergTransaction getTransaction(long id) { return getTransactionWithException(id); } - public Transaction getTransactionWithException(long id) { - Transaction icebergTransaction = transactions.get(id); + public IcebergTransaction getTransactionWithException(long id) { + IcebergTransaction icebergTransaction = transactions.get(id); if (icebergTransaction == null) { throw new RuntimeException("Can't find transaction for " + id); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java index 10de5427902..4375dc5c025 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergTransactionTest.java @@ -18,15 +18,22 @@ package org.apache.doris.datasource.iceberg; import org.apache.doris.common.UserException; +import org.apache.doris.common.info.SimpleTableInfo; +import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.thrift.TFileContent; import org.apache.doris.thrift.TIcebergCommitData; +import com.google.common.collect.Maps; +import mockit.Mock; +import mockit.MockUp; +import org.apache.commons.lang3.SerializationUtils; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.expressions.Expression; @@ -39,10 +46,11 @@ import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.junit.Assert; -import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.io.Serializable; import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; @@ -50,23 +58,26 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; public class IcebergTransactionTest { - public static String dbName = "db3"; - public static String tbWithPartition = "tbWithPartition"; - public static String tbWithoutPartition = "tbWithoutPartition"; - public static IcebergMetadataOps ops; - public static Schema schema; + private static String dbName = "db3"; + private static String tbWithPartition = "tbWithPartition"; + private static String tbWithoutPartition = "tbWithoutPartition"; - @BeforeClass - public static void beforeClass() throws IOException { + private IcebergExternalCatalog externalCatalog; + private IcebergMetadataOps ops; + + + @Before + public void init() throws IOException { createCatalog(); createTable(); } - public static void createCatalog() throws IOException { + private void createCatalog() throws IOException { Path warehousePath = Files.createTempDirectory("test_warehouse_"); String warehouse = "file://" + warehousePath.toAbsolutePath() + "/"; HadoopCatalog hadoopCatalog = new HadoopCatalog(); @@ -74,25 +85,32 @@ public class IcebergTransactionTest { props.put(CatalogProperties.WAREHOUSE_LOCATION, warehouse); hadoopCatalog.setConf(new Configuration()); hadoopCatalog.initialize("df", props); - ops = new IcebergMetadataOps(null, hadoopCatalog); + this.externalCatalog = new IcebergHMSExternalCatalog(1L, "iceberg", "", Maps.newHashMap(), ""); + new MockUp<IcebergHMSExternalCatalog>() { + @Mock + public Catalog getCatalog() { + return hadoopCatalog; + } + }; + ops = new IcebergMetadataOps(externalCatalog, hadoopCatalog); } - public static void createTable() throws IOException { + private void createTable() throws IOException { HadoopCatalog icebergCatalog = (HadoopCatalog) ops.getCatalog(); icebergCatalog.createNamespace(Namespace.of(dbName)); - schema = new Schema( - Types.NestedField.required(11, "ts1", Types.TimestampType.withoutZone()), - Types.NestedField.required(12, "ts2", Types.TimestampType.withoutZone()), - Types.NestedField.required(13, "ts3", Types.TimestampType.withoutZone()), - Types.NestedField.required(14, "ts4", Types.TimestampType.withoutZone()), - Types.NestedField.required(15, "dt1", Types.DateType.get()), - Types.NestedField.required(16, "dt2", Types.DateType.get()), - Types.NestedField.required(17, "dt3", Types.DateType.get()), - Types.NestedField.required(18, "dt4", Types.DateType.get()), - Types.NestedField.required(19, "str1", Types.StringType.get()), - Types.NestedField.required(20, "str2", Types.StringType.get()), - Types.NestedField.required(21, "int1", Types.IntegerType.get()), - Types.NestedField.required(22, "int2", Types.IntegerType.get()) + Schema schema = new Schema( + Types.NestedField.required(11, "ts1", Types.TimestampType.withoutZone()), + Types.NestedField.required(12, "ts2", Types.TimestampType.withoutZone()), + Types.NestedField.required(13, "ts3", Types.TimestampType.withoutZone()), + Types.NestedField.required(14, "ts4", Types.TimestampType.withoutZone()), + Types.NestedField.required(15, "dt1", Types.DateType.get()), + Types.NestedField.required(16, "dt2", Types.DateType.get()), + Types.NestedField.required(17, "dt3", Types.DateType.get()), + Types.NestedField.required(18, "dt4", Types.DateType.get()), + Types.NestedField.required(19, "str1", Types.StringType.get()), + Types.NestedField.required(20, "str2", Types.StringType.get()), + Types.NestedField.required(21, "int1", Types.IntegerType.get()), + Types.NestedField.required(22, "int2", Types.IntegerType.get()) ); PartitionSpec partitionSpec = PartitionSpec.builderFor(schema) @@ -112,7 +130,7 @@ public class IcebergTransactionTest { icebergCatalog.createTable(TableIdentifier.of(dbName, tbWithoutPartition), schema); } - public List<String> createPartitionValues() { + private List<String> createPartitionValues() { Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); long ts = DateTimeUtil.microsFromInstant(instant); @@ -165,14 +183,23 @@ public class IcebergTransactionTest { ctdList.add(ctd1); ctdList.add(ctd2); + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithPartition)); + + new MockUp<IcebergUtils>() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + IcebergTransaction txn = getTxn(); txn.updateIcebergCommitData(ctdList); - txn.beginInsert(dbName, tbWithPartition); - txn.finishInsert(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithPartition)); - checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); + checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); checkPushDownByPartitionForTs(table, "ts1"); checkPushDownByPartitionForTs(table, "ts2"); checkPushDownByPartitionForTs(table, "ts3"); @@ -189,7 +216,7 @@ public class IcebergTransactionTest { checkPushDownByPartitionForBucketInt(table, "int1"); } - public void checkPushDownByPartitionForBucketInt(Table table, String column) { + private void checkPushDownByPartitionForBucketInt(Table table, String column) { // (BucketUtil.hash(15) & Integer.MAX_VALUE) % 2 = 0 Integer i1 = 15; @@ -212,12 +239,12 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan2, 2); } - public void checkPushDownByPartitionForString(Table table, String column) { + private void checkPushDownByPartitionForString(Table table, String column) { // Since the string used to create the partition is in date format, the date check can be reused directly checkPushDownByPartitionForDt(table, column); } - public void checkPushDownByPartitionForTs(Table table, String column) { + private void checkPushDownByPartitionForTs(Table table, String column) { String lessTs = "2023-12-11T12:34:56.123456"; String eqTs = "2024-12-11T12:34:56.123456"; String greaterTs = "2025-12-11T12:34:56.123456"; @@ -230,7 +257,7 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan, 0); } - public void checkPushDownByPartitionForDt(Table table, String column) { + private void checkPushDownByPartitionForDt(Table table, String column) { String less = "2023-12-11"; String eq = "2024-12-11"; String greater = "2025-12-11"; @@ -243,7 +270,7 @@ public class IcebergTransactionTest { checkPushDownByPartition(table, greaterThan, 0); } - public void checkPushDownByPartition(Table table, Expression expr, Integer expectFiles) { + private void checkPushDownByPartition(Table table, Expression expr, Integer expectFiles) { CloseableIterable<FileScanTask> fileScanTasks = table.newScan().filter(expr).planFiles(); AtomicReference<Integer> cnt = new AtomicReference<>(0); fileScanTasks.forEach(notUse -> cnt.updateAndGet(v -> v + 1)); @@ -268,45 +295,64 @@ public class IcebergTransactionTest { ctdList.add(ctd1); ctdList.add(ctd2); + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + new MockUp<IcebergUtils>() { + @Mock + public Table getRemoteTable(ExternalCatalog catalog, SimpleTableInfo tableInfo) { + return table; + } + }; + IcebergTransaction txn = getTxn(); txn.updateIcebergCommitData(ctdList); - txn.beginInsert(dbName, tbWithoutPartition); - txn.finishInsert(); + SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbWithPartition); + txn.beginInsert(tableInfo); + txn.finishInsert(tableInfo, Optional.empty()); txn.commit(); - Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); checkSnapshotProperties(table.currentSnapshot().summary(), "6", "2", "6"); } - public void checkSnapshotProperties(Map<String, String> props, - String addRecords, - String addFileCnt, - String addFileSize) { + private IcebergTransaction getTxn() { + return new IcebergTransaction(ops); + } + + private void checkSnapshotProperties(Map<String, String> props, + String addRecords, + String addFileCnt, + String addFileSize) { Assert.assertEquals(addRecords, props.get("added-records")); Assert.assertEquals(addFileCnt, props.get("added-data-files")); Assert.assertEquals(addFileSize, props.get("added-files-size")); } - public String numToYear(Integer num) { + private String numToYear(Integer num) { Transform<Object, Integer> year = Transforms.year(); return year.toHumanString(Types.IntegerType.get(), num); } - public String numToMonth(Integer num) { + private String numToMonth(Integer num) { Transform<Object, Integer> month = Transforms.month(); return month.toHumanString(Types.IntegerType.get(), num); } - public String numToDay(Integer num) { + private String numToDay(Integer num) { Transform<Object, Integer> day = Transforms.day(); return day.toHumanString(Types.IntegerType.get(), num); } - public String numToHour(Integer num) { + private String numToHour(Integer num) { Transform<Object, Integer> hour = Transforms.hour(); return hour.toHumanString(Types.IntegerType.get(), num); } + @Test + public void tableCloneTest() { + Table table = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbWithoutPartition)); + Table cloneTable = (Table) SerializationUtils.clone((Serializable) table); + Assert.assertNotNull(cloneTable); + } + @Test public void testTransform() { Instant instant = Instant.parse("2024-12-11T12:34:56.123456Z"); @@ -322,7 +368,4 @@ public class IcebergTransactionTest { Assert.assertEquals("2024-12-11", numToDay(dt)); } - public IcebergTransaction getTxn() { - return new IcebergTransaction(ops); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org