This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 6b3ce8c9185 [feature](be) Support Iceberg equality deletes in reader
(#63852)
6b3ce8c9185 is described below
commit 6b3ce8c9185f7f837e2f26645f444e316fbd2c5b
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 23:16:41 2026 +0800
[feature](be) Support Iceberg equality deletes in reader (#63852)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary: Implement Iceberg equality delete filtering in the v2
Iceberg reader by materializing equality delete keys as delete predicate
expressions and applying them through the file reader filter path.
### Release note
Support reading Iceberg equality delete files in the BE Iceberg reader.
### Check List (For Author)
- Test: Unit Test / Manual test
- Added EqualityDeletePredicateTest for single-column, multi-column,
null matching, and error handling.
- Manual test: git diff --check.
- Not run: run-be-ut.sh failed because this environment only has JDK 11
and requires JDK 17; clang-format script failed because llvm@16 is not
installed.
- Behavior changed: Yes, Iceberg reader now filters equality-deleted
rows.
- Does this need documentation: No
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/format/new_parquet/parquet_reader.cpp | 8 +-
.../reader/expr/equality_delete_predicate.cpp | 158 ++++++++++++++
.../format/reader/expr/equality_delete_predicate.h | 71 +++++++
be/src/format/reader/table_reader.cpp | 2 +-
be/src/format/reader/table_reader.h | 4 +-
be/src/format/table/iceberg_reader_v2.cpp | 229 ++++++++++++++++-----
be/src/format/table/iceberg_reader_v2.h | 51 ++---
.../reader/expr/equality_delete_predicate_test.cpp | 181 ++++++++++++++++
be/test/format/reader/table_reader_test.cpp | 19 +-
9 files changed, 634 insertions(+), 89 deletions(-)
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 489e184cd2b..5e4107d727d 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -339,9 +339,9 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t
batch_rows, Block* file_
RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
&can_filter_all));
- *selected_rows = can_filter_all ? 0
- :
_apply_filter_to_selection(filter, selection,
-
*selected_rows);
+ *selected_rows =
+ can_filter_all ? 0
+ : _apply_filter_to_selection(filter,
selection, *selected_rows);
}
if (*selected_rows == 0) {
break;
@@ -367,7 +367,7 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t
batch_rows, Block* file_
file_block->erase(result_column_id);
*selected_rows =
!has_kept_row ? 0
- : _apply_filter_to_selection(keep_filter,
selection, *selected_rows);
+ : _apply_filter_to_selection(keep_filter,
selection, *selected_rows);
}
return Status::OK();
}
diff --git a/be/src/format/reader/expr/equality_delete_predicate.cpp
b/be/src/format/reader/expr/equality_delete_predicate.cpp
new file mode 100644
index 00000000000..2b714abade7
--- /dev/null
+++ b/be/src/format/reader/expr/equality_delete_predicate.cpp
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/expr/equality_delete_predicate.h"
+
+#include <gen_cpp/Exprs_types.h>
+
+#include <utility>
+
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/column_with_type_and_name.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+
+namespace doris {
+namespace {
+
+bool column_value_equal(const ColumnPtr& lhs, size_t lhs_row, const ColumnPtr&
rhs,
+ size_t rhs_row) {
+ if (lhs->is_nullable() && rhs->is_nullable()) {
+ return lhs->compare_at(lhs_row, rhs_row, *rhs, -1) == 0;
+ }
+ if (lhs->is_nullable()) {
+ const auto& nullable_lhs = assert_cast<const ColumnNullable&>(*lhs);
+ return !nullable_lhs.is_null_at(lhs_row) &&
+ nullable_lhs.get_nested_column().compare_at(lhs_row, rhs_row,
*rhs, -1) == 0;
+ }
+ if (rhs->is_nullable()) {
+ const auto& nullable_rhs = assert_cast<const ColumnNullable&>(*rhs);
+ return !nullable_rhs.is_null_at(rhs_row) &&
+ lhs->compare_at(lhs_row, rhs_row,
nullable_rhs.get_nested_column(), -1) == 0;
+ }
+ return lhs->compare_at(lhs_row, rhs_row, *rhs, -1) == 0;
+}
+
+} // namespace
+
+EqualityDeletePredicate::EqualityDeletePredicate(Block delete_block,
std::vector<int> field_ids)
+ : VExpr(), _delete_block(std::move(delete_block)),
_field_ids(std::move(field_ids)) {
+ _node_type = TExprNodeType::PREDICATE;
+ _opcode = TExprOpcode::DELETE;
+ _data_type = std::make_shared<DataTypeBool>();
+ _expr_name = "EqualityDeletePredicate";
+ DCHECK_EQ(_delete_block.columns(), _field_ids.size());
+ _delete_hashes = _build_hashes(_delete_block);
+ for (size_t row = 0; row < _delete_hashes.size(); ++row) {
+ _delete_hash_map.emplace(_delete_hashes[row], row);
+ }
+}
+
+Status EqualityDeletePredicate::prepare(RuntimeState* state, const
RowDescriptor& desc,
+ VExprContext* context) {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+ _expr_name = "EqualityDeletePredicate";
+ _prepare_finished = true;
+ return Status::OK();
+}
+
+Status EqualityDeletePredicate::open(RuntimeState* state, VExprContext*
context,
+ FunctionContext::FunctionStateScope
scope) {
+ DCHECK(_prepare_finished);
+ for (auto& child : _children) {
+ RETURN_IF_ERROR(child->open(state, context, scope));
+ }
+ if (scope == FunctionContext::FRAGMENT_LOCAL) {
+ RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
+ }
+ _open_finished = true;
+ return Status::OK();
+}
+
+void EqualityDeletePredicate::close(VExprContext* context,
+ FunctionContext::FunctionStateScope scope)
{
+ VExpr::close(context, scope);
+}
+
+Status EqualityDeletePredicate::execute(VExprContext* context, Block* block,
+ int* result_column_id) const {
+ if (_children.size() != _field_ids.size()) {
+ return Status::InternalError(
+ "EqualityDeletePredicate should have {} child exprs, but got
{}", _field_ids.size(),
+ _children.size());
+ }
+
+ Block data_key_block;
+ for (const auto& child : _children) {
+ int slot = -1;
+ RETURN_IF_ERROR(child->execute(context, block, &slot));
+ const auto& key_column = block->get_by_position(slot);
+ data_key_block.insert({key_column.column, key_column.type,
key_column.name});
+ }
+
+ const auto rows = data_key_block.rows();
+ auto res_col = ColumnBool::create(rows, 0);
+ if (_delete_hash_map.empty() || rows == 0) {
+ block->insert({std::move(res_col), std::make_shared<DataTypeBool>(),
expr_name()});
+ *result_column_id = static_cast<int>(block->columns() - 1);
+ return Status::OK();
+ }
+
+ auto data_hashes = _build_hashes(data_key_block);
+ auto& result_data = res_col->get_data();
+ for (size_t row = 0; row < rows; ++row) {
+ const auto range = _delete_hash_map.equal_range(data_hashes[row]);
+ for (auto it = range.first; it != range.second; ++it) {
+ if (_equal(data_key_block, row, it->second)) {
+ result_data[row] = true;
+ break;
+ }
+ }
+ }
+
+ block->insert({std::move(res_col), std::make_shared<DataTypeBool>(),
expr_name()});
+ *result_column_id = static_cast<int>(block->columns() - 1);
+ return Status::OK();
+}
+
+std::vector<uint64_t> EqualityDeletePredicate::_build_hashes(const Block&
block) {
+ std::vector<uint64_t> hashes(block.rows(), 0);
+ for (const auto& column : block.get_columns()) {
+ column->update_hashes_with_value(hashes.data(), nullptr);
+ }
+ return hashes;
+}
+
+bool EqualityDeletePredicate::_equal(const Block& data_block, size_t data_row,
+ size_t delete_row) const {
+ for (size_t column_idx = 0; column_idx < _delete_block.columns();
++column_idx) {
+ const auto& data_column =
data_block.get_by_position(column_idx).column;
+ const auto& delete_column =
_delete_block.get_by_position(column_idx).column;
+ if (!column_value_equal(data_column, data_row, delete_column,
delete_row)) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::string EqualityDeletePredicate::debug_string() const {
+ return _expr_name;
+}
+
+} // namespace doris
diff --git a/be/src/format/reader/expr/equality_delete_predicate.h
b/be/src/format/reader/expr/equality_delete_predicate.h
new file mode 100644
index 00000000000..2e33cffb398
--- /dev/null
+++ b/be/src/format/reader/expr/equality_delete_predicate.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "core/block/block.h"
+#include "exprs/function_context.h"
+#include "exprs/vexpr.h"
+
+namespace doris {
+class RowDescriptor;
+class RuntimeState;
+class VExprContext;
+} // namespace doris
+
+namespace doris {
+
+class EqualityDeletePredicate final : public VExpr {
+ ENABLE_FACTORY_CREATOR(EqualityDeletePredicate);
+
+public:
+ EqualityDeletePredicate(Block delete_block, std::vector<int> field_ids);
+ ~EqualityDeletePredicate() override = default;
+
+ Status execute(VExprContext* context, Block* block, int* result_column_id)
const override;
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ return Status::InternalError("Not implement
EqualityDeletePredicate::execute_column_impl");
+ }
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ void close(VExprContext* context, FunctionContext::FunctionStateScope
scope) override;
+ std::string debug_string() const override;
+ uint64_t get_digest(uint64_t seed) const override { return 0; }
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ static std::vector<uint64_t> _build_hashes(const Block& block);
+ bool _equal(const Block& data_block, size_t data_row, size_t delete_row)
const;
+
+ std::string _expr_name;
+ Block _delete_block;
+ std::vector<int> _field_ids;
+ std::vector<uint64_t> _delete_hashes;
+ std::multimap<uint64_t, size_t> _delete_hash_map;
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index 0735cc51f38..8289d637d78 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -21,8 +21,8 @@
#include <gen_cpp/Types_types.h>
#include <cstring>
-#include <stdexcept>
#include <set>
+#include <stdexcept>
#include <vector>
#include "common/cast_set.h"
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index f94e98bd837..de7626dfb24 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -139,7 +139,9 @@ public:
// 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
virtual Status init(TableReadOptions options);
- // 读取当前 split/partition 之前初始化。
+ // Prepare for reading a new split/task.
+ // 1. Pass a new split/task to reader, which will be used in subsequent
open_reader() to initialize the underlying file reader.
+ // 2. Parse delete predicates from split/task information, which will be
used for later dynamic filtering and delete handling.
virtual Status prepare_split(const SplitReadOptions& options);
// table-level 动态过滤入口。
diff --git a/be/src/format/table/iceberg_reader_v2.cpp
b/be/src/format/table/iceberg_reader_v2.cpp
index ad72313cc89..ed6649fce2c 100644
--- a/be/src/format/table/iceberg_reader_v2.cpp
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -22,6 +22,7 @@
#include <memory>
#include <utility>
+#include "common/cast_set.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
#include "core/column/column_const.h"
@@ -33,30 +34,32 @@
#include "core/field.h"
#include "format/new_parquet/column_reader.h"
#include "format/new_parquet/parquet_reader.h"
+#include "format/reader/expr/cast.h"
+#include "format/reader/expr/equality_delete_predicate.h"
+#include "format/reader/expr/slot_ref.h"
#include "format/reader/table_reader.h"
#include "format/table/deletion_vector_reader.h"
#include "io/file_factory.h"
namespace doris::iceberg {
-IcebergTableReader::PositionDeleteBlockCollector::PositionDeleteBlockCollector(
- std::string data_file_path, std::map<std::string, reader::DeleteRows>*
rows)
+IcebergTableReader::PositionDeleteRowsCollector::PositionDeleteRowsCollector(
+ std::string data_file_path, reader::DeleteRows* rows)
: _data_file_path(std::move(data_file_path)), _rows(rows) {}
-Status IcebergTableReader::PositionDeleteBlockCollector::collect(const Block&
block,
- size_t
read_rows) {
+Status IcebergTableReader::PositionDeleteRowsCollector::collect(const Block&
block,
+ size_t
read_rows) {
if (read_rows == 0) {
return Status::OK();
}
const auto& file_path_column = assert_cast<const ColumnString&>(
*block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column);
- const auto& pos_column =
- assert_cast<const
ColumnInt64&>(*block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION)
- .column);
+ const auto& pos_column = assert_cast<const ColumnInt64&>(
+ *block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION).column);
for (size_t row = 0; row < read_rows; ++row) {
const auto file_path = file_path_column.get_data_at(row).to_string();
if (file_path == _data_file_path) {
- (*_rows)[file_path].push_back(pos_column.get_element(row));
+ _rows->push_back(pos_column.get_element(row));
}
}
return Status::OK();
@@ -67,7 +70,7 @@ Status IcebergTableReader::prepare_split(const
reader::SplitReadOptions& options
_iceberg_params = nullptr;
_delete_predicates_initialized = false;
_position_delete_rows_storage.clear();
- _equality_delete_files.clear();
+ _equality_delete_filters.clear();
if (options.current_range.__isset.table_format_params &&
options.current_range.table_format_params.__isset.iceberg_params) {
const auto& iceberg_params =
options.current_range.table_format_params.iceberg_params;
@@ -81,13 +84,7 @@ Status IcebergTableReader::prepare_split(const
reader::SplitReadOptions& options
}
}
RETURN_IF_ERROR(TableReader::prepare_split(options));
- return
_collect_position_delete_rows(options.current_range.table_format_params);
-}
-
-Status IcebergTableReader::finalize_chunk(Block* block, const size_t rows) {
- RETURN_IF_ERROR(reader::TableReader::finalize_chunk(block, rows));
- RETURN_IF_ERROR(apply_equality_deletes(block));
- return Status::OK();
+ return _init_delete_predicates(options.current_range.table_format_params);
}
Status IcebergTableReader::materialize_virtual_columns(Block* table_block) {
@@ -114,6 +111,7 @@ Status
IcebergTableReader::customize_file_scan_request(reader::FileScanRequest*
if (_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) {
RETURN_IF_ERROR(_append_row_position_output_column(file_request));
}
+ RETURN_IF_ERROR(_append_equality_delete_predicates(file_request));
return Status::OK();
}
@@ -161,7 +159,7 @@ Status
IcebergTableReader::_parse_deletion_vector_file(const TTableFormatFileDes
return Status::OK();
}
-Status IcebergTableReader::_collect_position_delete_rows(const
TTableFormatFileDesc& t_desc) {
+Status IcebergTableReader::_init_delete_predicates(const TTableFormatFileDesc&
t_desc) {
if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) {
_delete_predicates_initialized = true;
return Status::OK();
@@ -175,6 +173,7 @@ Status
IcebergTableReader::_collect_position_delete_rows(const TTableFormatFileD
}
std::vector<TIcebergDeleteFileDesc> position_delete_files;
+ std::vector<TIcebergDeleteFileDesc> equality_delete_files;
for (const auto& delete_file : iceberg_params.delete_files) {
if (!delete_file.__isset.content) {
continue;
@@ -182,29 +181,31 @@ Status
IcebergTableReader::_collect_position_delete_rows(const TTableFormatFileD
if (delete_file.content == POSITION_DELETE) {
position_delete_files.push_back(delete_file);
} else if (delete_file.content == EQUALITY_DELETE) {
- _equality_delete_files.push_back(delete_file);
+ equality_delete_files.push_back(delete_file);
}
}
+ // `_delete_rows != nullptr` means DeleteVector is parsed
if (_delete_rows != nullptr) {
_position_delete_rows_storage = *_delete_rows;
_delete_rows = &_position_delete_rows_storage;
}
+ // Combine position delete rows from both deletion vector and position
delete files, and
+ // initialize equality delete predicates. Position delete files contain
row positions of
+ // deleted rows, which can be directly added to `_delete_rows`. Equality
delete files contain
+ // values of deleted rows, which require reading the files and building
predicates for later
+ // filtering.
if (!position_delete_files.empty()) {
- RETURN_IF_ERROR(_read_position_delete_files(position_delete_files));
+ RETURN_IF_ERROR(_init_position_delete_rows(position_delete_files));
+ }
+ if (!equality_delete_files.empty()) {
+
RETURN_IF_ERROR(_init_equality_delete_predicates(equality_delete_files));
}
_delete_predicates_initialized = true;
return Status::OK();
}
-Status IcebergTableReader::apply_equality_deletes(Block* block) {
- if (!_equality_delete_files.empty()) {
- return Status::NotSupported("Iceberg equality delete is not supported
by TableReader");
- }
- return Status::OK();
-}
-
std::string IcebergTableReader::_iceberg_delete_vector_cache_key(
const TIcebergDeleteFileDesc& delete_file) {
const std::string key_prefix = "iceberg_dv:";
@@ -259,14 +260,6 @@ const reader::SchemaField*
IcebergTableReader::_find_delete_field(
return nullptr;
}
-Block IcebergTableReader::_build_position_delete_block(const
reader::SchemaField& file_path_field,
- const
reader::SchemaField& pos_field) {
- Block block;
- block.insert({file_path_field.type->create_column(), file_path_field.type,
ICEBERG_FILE_PATH});
- block.insert({pos_field.type->create_column(), pos_field.type,
ICEBERG_ROW_POS});
- return block;
-}
-
Status
IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest*
request) {
const auto row_position_column_id =
doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
@@ -275,6 +268,46 @@ Status
IcebergTableReader::_append_row_position_output_column(reader::FileScanRe
return Status::OK();
}
+Status
IcebergTableReader::_append_equality_delete_predicates(reader::FileScanRequest*
request) {
+ DORIS_CHECK(request != nullptr);
+ for (const auto& filter : _equality_delete_filters) {
+ auto delete_predicate =
+ std::make_shared<EqualityDeletePredicate>(filter.delete_block,
filter.field_ids);
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.delete_conjunct =
VExprContext::create_shared(delete_predicate);
+ DCHECK_EQ(filter.field_ids.size(), filter.key_types.size());
+ for (size_t idx = 0; idx < filter.field_ids.size(); ++idx) {
+ const int field_id = filter.field_ids[idx];
+ auto field_it =
+ std::find_if(_data_reader.file_schema.begin(),
_data_reader.file_schema.end(),
+ [field_id](const reader::SchemaField& field) {
+ return !field.field_id_path.empty() &&
+ field.field_id_path.back() ==
field_id;
+ });
+ if (field_it == _data_reader.file_schema.end()) {
+ return Status::InternalError(
+ "Can not find equality delete column field id {} in
data file schema",
+ field_id);
+ }
+ _append_file_scan_column(request, field_it->id,
&request->predicate_columns);
+ const auto block_position =
request->column_positions.at(field_it->id);
+ auto slot =
TableSlotRef::create_shared(cast_set<int>(block_position),
+
cast_set<int>(block_position), -1,
+ field_it->type,
field_it->name);
+ if (field_it->type->equals(*filter.key_types[idx])) {
+ delete_predicate->add_child(std::move(slot));
+ } else {
+ auto cast_expr = Cast::create_shared(filter.key_types[idx]);
+ cast_expr->add_child(std::move(slot));
+ delete_predicate->add_child(std::move(cast_expr));
+ }
+ expression_filter.file_column_ids.push_back(field_it->id);
+ }
+ request->expression_filters.push_back(std::move(expression_filter));
+ }
+ return Status::OK();
+}
+
std::string IcebergTableReader::_data_file_path() const {
if (_iceberg_params != nullptr &&
_iceberg_params->__isset.original_file_path) {
return _iceberg_params->original_file_path;
@@ -286,7 +319,7 @@ std::string IcebergTableReader::_data_file_path() const {
Status IcebergTableReader::_read_parquet_position_delete_file(
const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams&
scan_params,
- IcebergDeleteFileIOContext* delete_io_ctx,
PositionDeleteBlockCollector* collector) {
+ IcebergDeleteFileIOContext* delete_io_ctx,
PositionDeleteRowsCollector* collector) {
if (!delete_file.__isset.file_format) {
return Status::InternalError("Iceberg position delete file is missing
file format");
}
@@ -326,8 +359,16 @@ Status
IcebergTableReader::_read_parquet_position_delete_file(
RETURN_IF_ERROR(reader.open(request));
bool eof = false;
+ auto build_position_delete_block = [](const reader::SchemaField&
file_path_field,
+ const reader::SchemaField&
pos_field) -> Block {
+ Block block;
+ block.insert(
+ {file_path_field.type->create_column(), file_path_field.type,
ICEBERG_FILE_PATH});
+ block.insert({pos_field.type->create_column(), pos_field.type,
ICEBERG_ROW_POS});
+ return block;
+ };
while (!eof) {
- Block block = _build_position_delete_block(*file_path_field,
*pos_field);
+ Block block = build_position_delete_block(*file_path_field,
*pos_field);
size_t read_rows = 0;
RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
RETURN_IF_ERROR(collector->collect(block, read_rows));
@@ -335,35 +376,127 @@ Status
IcebergTableReader::_read_parquet_position_delete_file(
return reader.close();
}
-Status IcebergTableReader::_read_position_delete_files(
+Status IcebergTableReader::_init_position_delete_rows(
const std::vector<TIcebergDeleteFileDesc>& delete_files) {
TFileScanRangeParams delete_scan_params =
_scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
- std::map<std::string, reader::DeleteRows> rows_by_file;
+ reader::DeleteRows position_delete_rows;
const auto data_file_path = _data_file_path();
IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
- PositionDeleteBlockCollector collector(data_file_path, &rows_by_file);
+ PositionDeleteRowsCollector collector(data_file_path,
&position_delete_rows);
for (const auto& delete_file : delete_files) {
RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file,
delete_scan_params,
&delete_io_ctx,
&collector));
}
- auto rows_it = rows_by_file.find(data_file_path);
- if (rows_it == rows_by_file.end()) {
+ if (position_delete_rows.empty()) {
return Status::OK();
}
// Position delete files and deletion vectors both become row-position
deletes for the
// common TableReader DeletePredicate path. Keep the merged rows in a
member vector because
// DeletePredicate stores a reference to the vector used by _delete_rows.
_position_delete_rows_storage.insert(_position_delete_rows_storage.end(),
- rows_it->second.begin(),
rows_it->second.end());
+ position_delete_rows.begin(),
position_delete_rows.end());
std::sort(_position_delete_rows_storage.begin(),
_position_delete_rows_storage.end());
-
_position_delete_rows_storage.erase(std::unique(_position_delete_rows_storage.begin(),
-
_position_delete_rows_storage.end()),
- _position_delete_rows_storage.end());
+ _position_delete_rows_storage.erase(
+ std::unique(_position_delete_rows_storage.begin(),
_position_delete_rows_storage.end()),
+ _position_delete_rows_storage.end());
_delete_rows = &_position_delete_rows_storage;
return Status::OK();
}
+Status IcebergTableReader::_init_equality_delete_predicates(
+ const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ TFileScanRangeParams delete_scan_params =
+ _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
+ IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
+ for (const auto& delete_file : delete_files) {
+ RETURN_IF_ERROR(_read_parquet_equality_delete_file(delete_file,
delete_scan_params,
+ &delete_io_ctx));
+ }
+ return Status::OK();
+}
+
+Status IcebergTableReader::_read_parquet_equality_delete_file(
+ const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams&
scan_params,
+ IcebergDeleteFileIOContext* delete_io_ctx) {
+ if (!delete_file.__isset.file_format) {
+ return Status::InternalError("Iceberg equality delete file is missing
file format");
+ }
+ if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) {
+ return Status::NotSupported("Unsupported Iceberg equality delete file
format {}",
+ delete_file.file_format);
+ }
+ if (!delete_file.__isset.field_ids || delete_file.field_ids.empty()) {
+ return Status::InternalError("Iceberg equality delete file is missing
field ids");
+ }
+
+ auto delete_range = build_iceberg_delete_file_range(delete_file.path);
+ if (_current_task != nullptr && _current_task->data_file != nullptr &&
+ !_current_task->data_file->fs_name.empty()) {
+ delete_range.__set_fs_name(_current_task->data_file->fs_name);
+ }
+ auto system_properties = _delete_file_system_properties(scan_params);
+ auto file_description = _delete_file_description(delete_range);
+ std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx,
[](io::IOContext*) {});
+ parquet::ParquetReader reader(system_properties, file_description, io_ctx,
_scanner_profile);
+ RETURN_IF_ERROR(reader.init(_runtime_state));
+
+ std::vector<reader::SchemaField> schema;
+ RETURN_IF_ERROR(reader.get_schema(&schema));
+ std::vector<reader::SchemaField> delete_fields;
+ std::vector<int> delete_field_ids;
+ std::vector<DataTypePtr> delete_key_types;
+ for (const auto field_id : delete_file.field_ids) {
+ auto field_it = std::find_if(
+ schema.begin(), schema.end(), [field_id](const
reader::SchemaField& field) {
+ return !field.field_id_path.empty() &&
field.field_id_path.back() == field_id;
+ });
+ if (field_it == schema.end()) {
+ return Status::InternalError("Can not find field id {} in equality
delete file {}",
+ field_id, delete_file.path);
+ }
+ if (!field_it->children.empty()) {
+ return Status::NotSupported(
+ "Iceberg equality delete does not support complex column
{}", field_it->name);
+ }
+ delete_fields.push_back(*field_it);
+ delete_field_ids.push_back(field_id);
+ delete_key_types.push_back(field_it->type);
+ }
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ for (size_t idx = 0; idx < delete_fields.size(); ++idx) {
+ request->non_predicate_columns.push_back(delete_fields[idx].id);
+ request->column_positions.emplace(delete_fields[idx].id, idx);
+ }
+ RETURN_IF_ERROR(reader.open(request));
+
+ auto build_equality_delete_block = [](const
std::vector<reader::SchemaField> fields) -> Block {
+ Block block;
+ for (const auto& field : fields) {
+ block.insert({field.type->create_column(), field.type,
field.name});
+ }
+ return block;
+ };
+ Block delete_block = build_equality_delete_block(delete_fields);
+ bool eof = false;
+ while (!eof) {
+ Block block = build_equality_delete_block(delete_fields);
+ size_t read_rows = 0;
+ RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
+ if (read_rows > 0) {
+ MutableBlock mutable_block(&delete_block);
+ RETURN_IF_ERROR(mutable_block.merge(block));
+ }
+ }
+ RETURN_IF_ERROR(reader.close());
+ _equality_delete_filters.push_back(
+ EqualityDeleteFilter {.field_ids = std::move(delete_field_ids),
+ .key_types = std::move(delete_key_types),
+ .delete_block = std::move(delete_block)});
+ return Status::OK();
+}
+
Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block,
size_t column_idx) {
if (_row_lineage_columns.first_row_id < 0) {
return Status::OK();
@@ -372,9 +505,9 @@ Status
IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block, s
const auto& row_position_column = assert_cast<const ColumnInt64&>(
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
DORIS_CHECK(row_position_column.size() == table_block->rows());
- auto column =
-
table_block->get_by_position(column_idx).column->convert_to_full_column_if_const()
- ->assume_mutable();
+ auto column = table_block->get_by_position(column_idx)
+ .column->convert_to_full_column_if_const()
+ ->assume_mutable();
auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
auto& null_map = nullable_column->get_null_map_data();
auto& data =
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index fbc8e28441b..497a989289a 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -17,14 +17,12 @@
#pragma once
-#include <cstddef>
-#include <cstdint>
-#include <map>
#include <memory>
#include <string>
#include <vector>
#include "common/status.h"
+#include "core/block/block.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"
#include "format/table/iceberg_delete_file_reader_helper.h"
@@ -51,11 +49,6 @@ public:
Status prepare_split(const reader::SplitReadOptions& options) override;
protected:
- // 将 file-local block 转换为 table/global schema block。
- // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
- // 物化以及复杂列 remap。
- Status finalize_chunk(Block* block, const size_t rows) override;
-
Status materialize_virtual_columns(Block* table_block) override;
Status customize_file_scan_request(reader::FileScanRequest* file_request)
override;
@@ -63,11 +56,7 @@ protected:
Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
bool* has_delete_file) override;
- Status _collect_position_delete_rows(const TTableFormatFileDesc& t_desc);
-
- // 在 table block 上应用 equality delete。
- // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
- Status apply_equality_deletes(Block* block);
+ Status _init_delete_predicates(const TTableFormatFileDesc& t_desc);
private:
static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2;
@@ -85,16 +74,15 @@ private:
static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0;
static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1;
- class PositionDeleteBlockCollector final {
+ class PositionDeleteRowsCollector final {
public:
- PositionDeleteBlockCollector(std::string data_file_path,
- std::map<std::string,
reader::DeleteRows>* rows);
+ PositionDeleteRowsCollector(std::string data_file_path,
reader::DeleteRows* rows);
Status collect(const Block& block, size_t read_rows);
private:
std::string _data_file_path;
- std::map<std::string, reader::DeleteRows>* _rows = nullptr;
+ reader::DeleteRows* _rows = nullptr;
};
static std::string _iceberg_delete_vector_cache_key(const
TIcebergDeleteFileDesc& delete_file);
@@ -102,27 +90,35 @@ private:
static std::shared_ptr<io::FileSystemProperties>
_delete_file_system_properties(
const TFileScanRangeParams& scan_params);
- static std::unique_ptr<io::FileDescription> _delete_file_description(const
TFileRangeDesc& range);
+ static std::unique_ptr<io::FileDescription> _delete_file_description(
+ const TFileRangeDesc& range);
static const reader::SchemaField* _find_delete_field(
const std::vector<reader::SchemaField>& schema, const std::string&
name);
- static Block _build_position_delete_block(const reader::SchemaField&
file_path_field,
- const reader::SchemaField&
pos_field);
-
Status _append_row_position_output_column(reader::FileScanRequest*
request);
+ Status _append_equality_delete_predicates(reader::FileScanRequest*
request);
+
+ Status _init_equality_delete_predicates(
+ const std::vector<TIcebergDeleteFileDesc>& delete_files);
+
std::string _data_file_path() const;
+ // Read equality/position delete files.
+ Status _read_parquet_equality_delete_file(const TIcebergDeleteFileDesc&
delete_file,
+ const TFileScanRangeParams&
scan_params,
+ IcebergDeleteFileIOContext*
delete_io_ctx);
Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc&
delete_file,
const TFileScanRangeParams&
scan_params,
IcebergDeleteFileIOContext*
delete_io_ctx,
- PositionDeleteBlockCollector*
collector);
+ PositionDeleteRowsCollector*
collector);
- Status _read_position_delete_files(const
std::vector<TIcebergDeleteFileDesc>& delete_files);
+ // Read position delete files and collect deleted row positions to update
DeletePredicate.
+ Status _init_position_delete_rows(const
std::vector<TIcebergDeleteFileDesc>& delete_files);
+ // Materialize row lineage virtual columns based on the position delete
file.
Status _materialize_row_lineage_row_id(Block* table_block, size_t
column_idx);
-
Status _materialize_row_lineage_last_updated_sequence_number(Block*
table_block,
size_t
column_idx);
@@ -131,7 +127,12 @@ private:
const TIcebergFileDesc* _iceberg_params = nullptr;
bool _delete_predicates_initialized = false;
reader::DeleteRows _position_delete_rows_storage;
- std::vector<TIcebergDeleteFileDesc> _equality_delete_files;
+ struct EqualityDeleteFilter {
+ std::vector<int> field_ids;
+ std::vector<DataTypePtr> key_types;
+ Block delete_block;
+ };
+ std::vector<EqualityDeleteFilter> _equality_delete_filters;
bool _need_row_lineage_row_id() const;
};
diff --git a/be/test/format/reader/expr/equality_delete_predicate_test.cpp
b/be/test/format/reader/expr/equality_delete_predicate_test.cpp
new file mode 100644
index 00000000000..07ff0f78f81
--- /dev/null
+++ b/be/test/format/reader/expr/equality_delete_predicate_test.cpp
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/expr/equality_delete_predicate.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/data_type_string.h"
+#include "exprs/vexpr_context.h"
+#include "format/reader/expr/cast.h"
+#include "runtime/descriptors.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "testutil/mock/mock_slot_ref.h"
+
+namespace doris {
+
+class EqualityDeletePredicateTest : public testing::Test {
+protected:
+ static ColumnWithTypeAndName make_nullable_int_column(
+ const std::string& name, const std::vector<std::optional<int>>&
values) {
+ auto data = ColumnInt32::create();
+ auto null_map = ColumnUInt8::create();
+ for (const auto& value : values) {
+ data->insert_value(value.value_or(0));
+ null_map->insert_value(!value.has_value());
+ }
+ auto type = make_nullable(std::make_shared<DataTypeInt32>());
+ return {ColumnNullable::create(std::move(data), std::move(null_map)),
type, name};
+ }
+
+ static ColumnWithTypeAndName make_nullable_string_column(
+ const std::string& name, const
std::vector<std::optional<std::string>>& values) {
+ auto data = ColumnString::create();
+ auto null_map = ColumnUInt8::create();
+ for (const auto& value : values) {
+ const std::string data_value = value.value_or("");
+ data->insert_data(data_value.data(), data_value.size());
+ null_map->insert_value(!value.has_value());
+ }
+ auto type = make_nullable(std::make_shared<DataTypeString>());
+ return {ColumnNullable::create(std::move(data), std::move(null_map)),
type, name};
+ }
+
+ static std::vector<UInt8> result_column_data(const Block& block, int
result_column_id) {
+ const auto& result_column =
+ assert_cast<const
ColumnBool&>(*block.get_by_position(result_column_id).column);
+ return {result_column.get_data().begin(),
result_column.get_data().end()};
+ }
+
+ static Status execute_equality_delete_predicate(Block delete_block,
std::vector<int> field_ids,
+ Block* data_block, int*
result_column_id) {
+ auto predicate =
+
std::make_shared<EqualityDeletePredicate>(std::move(delete_block), field_ids);
+ predicate->_open_finished = true;
+ for (size_t idx = 0; idx < field_ids.size(); ++idx) {
+ predicate->add_child(
+ std::make_shared<MockSlotRef>(idx,
data_block->get_by_position(idx).type));
+ }
+
+ VExprContext context(predicate);
+ return predicate->execute(&context, data_block, result_column_id);
+ }
+
+ static Status execute_prepared_equality_delete_predicate(const
VExprContextSPtr& context,
+ MockRuntimeState*
state,
+ Block* data_block,
+ int*
result_column_id) {
+ RETURN_IF_ERROR(context->prepare(state, RowDescriptor()));
+ RETURN_IF_ERROR(context->open(state));
+ return context->execute(data_block, result_column_id);
+ }
+};
+
+TEST_F(EqualityDeletePredicateTest, MatchSingleColumn) {
+ Block delete_block;
+ delete_block.insert(make_nullable_int_column("id", {1, 4}));
+ Block data_block;
+ data_block.insert(make_nullable_int_column("id", {1, 2, 3, 4}));
+
+ int result_column_id = -1;
+ auto status = execute_equality_delete_predicate(std::move(delete_block),
{1}, &data_block,
+ &result_column_id);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_EQ(result_column_data(data_block, result_column_id),
std::vector<UInt8>({1, 0, 0, 1}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchMultipleColumns) {
+ Block delete_block;
+ delete_block.insert(make_nullable_int_column("id", {1, 2}));
+ delete_block.insert(make_nullable_string_column("name", {"a", "b"}));
+ Block data_block;
+ data_block.insert(make_nullable_int_column("id", {1, 1, 2, 2}));
+ data_block.insert(make_nullable_string_column("name", {"a", "b", "a",
"b"}));
+
+ int result_column_id = -1;
+ auto status = execute_equality_delete_predicate(std::move(delete_block),
{1, 2}, &data_block,
+ &result_column_id);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_EQ(result_column_data(data_block, result_column_id),
std::vector<UInt8>({1, 0, 0, 1}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchNullValues) {
+ Block delete_block;
+ delete_block.insert(make_nullable_int_column("id", {std::nullopt}));
+ Block data_block;
+ data_block.insert(make_nullable_int_column("id", {1, std::nullopt, 3}));
+
+ int result_column_id = -1;
+ auto status = execute_equality_delete_predicate(std::move(delete_block),
{1}, &data_block,
+ &result_column_id);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_EQ(result_column_data(data_block, result_column_id),
std::vector<UInt8>({0, 1, 0}));
+}
+
+TEST_F(EqualityDeletePredicateTest, MatchAfterCastToDeleteKeyType) {
+ Block delete_block;
+ delete_block.insert(make_nullable_int_column("id", {1, 4}));
+ Block data_block;
+ data_block.insert(ColumnHelper::create_column_with_name<DataTypeInt64>({1,
2, 4}));
+
+ auto predicate =
std::make_shared<EqualityDeletePredicate>(std::move(delete_block),
+
std::vector<int> {1});
+ auto cast_expr =
Cast::create_shared(make_nullable(std::make_shared<DataTypeInt32>()));
+ cast_expr->add_child(std::make_shared<MockSlotRef>(0,
data_block.get_by_position(0).type));
+ predicate->add_child(std::move(cast_expr));
+ auto context = VExprContext::create_shared(predicate);
+ MockRuntimeState state;
+
+ int result_column_id = -1;
+ auto status = execute_prepared_equality_delete_predicate(context, &state,
&data_block,
+
&result_column_id);
+ ASSERT_TRUE(status.ok()) << status;
+ EXPECT_EQ(result_column_data(data_block, result_column_id),
std::vector<UInt8>({1, 0, 1}));
+ context->close();
+}
+
+TEST_F(EqualityDeletePredicateTest, ChildCountMismatchReturnsError) {
+ Block delete_block;
+ delete_block.insert(make_nullable_int_column("id", {1}));
+ auto predicate =
std::make_shared<EqualityDeletePredicate>(std::move(delete_block),
+
std::vector<int> {1});
+ predicate->_open_finished = true;
+ Block data_block;
+ data_block.insert(make_nullable_int_column("id", {1}));
+ VExprContext context(predicate);
+
+ int result_column_id = -1;
+ auto status = predicate->execute(&context, &data_block, &result_column_id);
+ ASSERT_FALSE(status.ok());
+ EXPECT_NE(status.to_string().find("should have 1 child exprs"),
std::string::npos);
+}
+
+} // namespace doris
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index 8705775485f..8a72937002d 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -301,9 +301,8 @@ void write_position_delete_parquet_file(const std::string&
file_path,
arrow::field("file_path", arrow::utf8(), false),
arrow::field("pos", arrow::int64(), false),
});
- auto table = arrow::Table::Make(schema,
- {build_string_array(data_file_paths),
- build_int64_array(positions)});
+ auto table = arrow::Table::Make(
+ schema, {build_string_array(data_file_paths),
build_int64_array(positions)});
auto file_result = arrow::io::FileOutputStream::Open(file_path);
ASSERT_TRUE(file_result.ok()) << file_result.status();
@@ -313,9 +312,9 @@ void write_position_delete_parquet_file(const std::string&
file_path,
builder.version(::parquet::ParquetVersion::PARQUET_2_6);
builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
builder.compression(::parquet::Compression::UNCOMPRESSED);
- PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
- *table, arrow::default_memory_pool(), out,
static_cast<int64_t>(positions.size()),
- builder.build()));
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(*table,
arrow::default_memory_pool(), out,
+
static_cast<int64_t>(positions.size()),
+ builder.build()));
}
int64_t write_iceberg_deletion_vector_file(const std::string& file_path,
@@ -423,7 +422,8 @@ std::unique_ptr<ReadProfile>
make_table_read_profile(RuntimeProfile* profile) {
}
TTableFormatFileDesc make_iceberg_table_format_desc(
- const std::string& data_file_path, const
std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ const std::string& data_file_path,
+ const std::vector<TIcebergDeleteFileDesc>& delete_files) {
TTableFormatFileDesc table_format_params;
TIcebergFileDesc iceberg_params;
iceberg_params.__set_format_version(2);
@@ -433,9 +433,8 @@ TTableFormatFileDesc make_iceberg_table_format_desc(
return table_format_params;
}
-std::vector<int32_t> read_iceberg_ids(
- doris::iceberg::IcebergTableReader* reader,
- const std::vector<TableColumn>& projected_columns) {
+std::vector<int32_t> read_iceberg_ids(doris::iceberg::IcebergTableReader*
reader,
+ const std::vector<TableColumn>&
projected_columns) {
std::vector<int32_t> ids;
bool eos = false;
while (!eos) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]