This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 2cfd5031169 [feature](be) Support Iceberg position delete predicates
(#63799)
2cfd5031169 is described below
commit 2cfd5031169f2f37ec7260958cbf928386366ffd
Author: Gabriel <[email protected]>
AuthorDate: Thu May 28 17:36:46 2026 +0800
[feature](be) Support Iceberg position delete predicates (#63799)
Add file-layer DeletePredicate execution for Parquet
row positions and wire IcebergTableReader v2 to convert Iceberg position
deletes and deletion vectors into file-local deleted row positions.
Equality delete files are detected and fail explicitly instead of being
silently ignored.
---
be/src/format/new_parquet/parquet_reader.cpp | 43 ++-
be/src/format/reader/expr/delete_predicate.cpp | 14 +-
be/src/format/reader/table/paimon_reader.cpp | 35 +-
be/src/format/reader/table/paimon_reader.h | 3 +-
be/src/format/reader/table_reader.cpp | 118 +++---
be/src/format/reader/table_reader.h | 163 ++++++---
be/src/format/table/deletion_vector_reader.h | 6 +
be/src/format/table/iceberg_reader_v2.cpp | 396 +++++++++++++++++++-
be/src/format/table/iceberg_reader_v2.h | 198 ++++------
be/test/format/new_parquet/parquet_reader_test.cpp | 94 +++++
be/test/format/reader/table_reader_test.cpp | 401 +++++++++++++++++++++
11 files changed, 1234 insertions(+), 237 deletions(-)
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index 043f155dd85..489e184cd2b 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -327,18 +327,47 @@ Status ParquetReader::_execute_filter_conjuncts(int64_t
batch_rows, Block* file_
// predicate columns in the file-local block have been materialized.
for (const auto& expression_filter : _request->expression_filters) {
if (expression_filter.conjunct == nullptr) {
- continue;
+ if (expression_filter.delete_conjunct == nullptr) {
+ continue;
+ }
+ } else {
+ if (*selected_rows == 0) {
+ break;
+ }
+ IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
+ bool can_filter_all = false;
+ RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(
+ file_block, filter.data(),
static_cast<size_t>(batch_rows), false,
+ &can_filter_all));
+ *selected_rows = can_filter_all ? 0
+ :
_apply_filter_to_selection(filter, selection,
+
*selected_rows);
}
if (*selected_rows == 0) {
break;
}
- IColumn::Filter filter(static_cast<size_t>(batch_rows), 1);
- bool can_filter_all = false;
- RETURN_IF_ERROR(expression_filter.conjunct->execute_filter(file_block,
filter.data(),
-
static_cast<size_t>(batch_rows),
- false,
&can_filter_all));
+ if (expression_filter.delete_conjunct == nullptr) {
+ continue;
+ }
+ int result_column_id = -1;
+ RETURN_IF_ERROR(expression_filter.delete_conjunct->root()->execute(
+ expression_filter.delete_conjunct.get(), file_block,
&result_column_id));
+ DORIS_CHECK(result_column_id >= 0 &&
+ result_column_id <
static_cast<int>(file_block->columns()));
+ const auto& delete_filter = assert_cast<const ColumnUInt8&>(
+
*file_block->get_by_position(result_column_id).column)
+ .get_data();
+ DORIS_CHECK(delete_filter.size() == static_cast<size_t>(batch_rows));
+ IColumn::Filter keep_filter(static_cast<size_t>(batch_rows), 1);
+ bool has_kept_row = false;
+ for (size_t row = 0; row < static_cast<size_t>(batch_rows); ++row) {
+ keep_filter[row] = !delete_filter[row];
+ has_kept_row |= keep_filter[row] != 0;
+ }
+ file_block->erase(result_column_id);
*selected_rows =
- can_filter_all ? 0 : _apply_filter_to_selection(filter,
selection, *selected_rows);
+ !has_kept_row ? 0
+ : _apply_filter_to_selection(keep_filter,
selection, *selected_rows);
}
return Status::OK();
}
diff --git a/be/src/format/reader/expr/delete_predicate.cpp
b/be/src/format/reader/expr/delete_predicate.cpp
index 01844fa8a07..31c6a057afd 100644
--- a/be/src/format/reader/expr/delete_predicate.cpp
+++ b/be/src/format/reader/expr/delete_predicate.cpp
@@ -69,26 +69,26 @@ void DeletePredicate::close(VExprContext* context,
FunctionContext::FunctionStat
* Row IDs should be generated by file reader as a virtual column in `block`.
**/
Status DeletePredicate::execute(VExprContext* context, Block* block, int*
result_column_id) const {
- if (block->empty()) {
- return Status::OK();
- }
- DCHECK(_open_finished || block == nullptr);
if (_children.size() != 1) {
return Status::InternalError(fmt::format(
"DeletePredicate should have exactly 1 child expr, but got
{}", _children.size()));
}
int slot = -1;
RETURN_IF_ERROR(_children[0]->execute(context, block, &slot));
- const auto count = block->rows();
- auto res_col = ColumnBool::create(block->rows(), 0);
const auto& row_ids =
assert_cast<const
ColumnInt64&>(*block->get_by_position(slot).column).get_data();
- DCHECK_EQ(row_ids.size(), count);
+ const auto count = row_ids.size();
+ auto res_col = ColumnBool::create(count, 0);
if (_deleted_rows.empty()) {
block->insert({std::move(res_col), std::make_shared<DataTypeBool>(),
expr_name()});
*result_column_id = static_cast<int>(block->get_columns().size() - 1);
return Status::OK();
}
+ if (count == 0) {
+ block->insert({std::move(res_col), std::make_shared<DataTypeBool>(),
expr_name()});
+ *result_column_id = static_cast<int>(block->get_columns().size() - 1);
+ return Status::OK();
+ }
const int64_t* delete_rows = _deleted_rows.data();
const int64_t* delete_rows_end = delete_rows + _deleted_rows.size();
const int64_t* start_pos = std::lower_bound(delete_rows, delete_rows_end,
row_ids[0]);
diff --git a/be/src/format/reader/table/paimon_reader.cpp
b/be/src/format/reader/table/paimon_reader.cpp
index 713d1a97e68..d5c450b2c01 100644
--- a/be/src/format/reader/table/paimon_reader.cpp
+++ b/be/src/format/reader/table/paimon_reader.cpp
@@ -17,26 +17,39 @@
#include "format/reader/table/paimon_reader.h"
+#include <cstring>
+#include <string>
+
#include "format/table/deletion_vector_reader.h"
namespace doris::paimon {
-bool PaimonReader::_parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) {
+Status PaimonReader::_parse_deletion_vector_file(const TTableFormatFileDesc&
t_desc,
+ DeleteFileDesc* desc, bool*
has_delete_file) {
+ DORIS_CHECK(desc != nullptr);
+ DORIS_CHECK(has_delete_file != nullptr);
+ *has_delete_file = false;
const auto& table_desc = t_desc.paimon_params;
if (!table_desc.__isset.deletion_file) {
- return false;
+ return Status::OK();
}
const auto& deletion_file = table_desc.deletion_file;
- desc.key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
- memcpy(desc.key.data(), deletion_file.path.data(),
deletion_file.path.size());
- memcpy(desc.key.data() + deletion_file.path.size(), &deletion_file.offset,
- sizeof(deletion_file.offset));
- desc.path = deletion_file.path;
- desc.start_offset = deletion_file.offset;
- desc.size = deletion_file.length + 4;
- desc.file_size = -1;
- return true;
+ const std::string key_prefix = "paimon_dv:";
+ desc->key.resize(key_prefix.size() + deletion_file.path.size() +
sizeof(deletion_file.offset));
+ char* key_data = desc->key.data();
+ memcpy(key_data, key_prefix.data(), key_prefix.size());
+ key_data += key_prefix.size();
+ memcpy(key_data, deletion_file.path.data(), deletion_file.path.size());
+ key_data += deletion_file.path.size();
+ memcpy(key_data, &deletion_file.offset, sizeof(deletion_file.offset));
+ desc->path = deletion_file.path;
+ desc->start_offset = deletion_file.offset;
+ desc->size = deletion_file.length + 4;
+ desc->file_size = -1;
+ desc->format = DeleteFileDesc::Format::PAIMON;
+ *has_delete_file = true;
+ return Status::OK();
}
} // namespace doris::paimon
diff --git a/be/src/format/reader/table/paimon_reader.h
b/be/src/format/reader/table/paimon_reader.h
index d0f33c7a90c..ce386460a6e 100644
--- a/be/src/format/reader/table/paimon_reader.h
+++ b/be/src/format/reader/table/paimon_reader.h
@@ -30,7 +30,8 @@ public:
~PaimonReader() final = default;
protected:
- bool _parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) override;
+ Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
+ bool* has_delete_file) override;
};
} // namespace doris::paimon
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index 86868b97b0b..0735cc51f38 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -20,16 +20,21 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
+#include <cstring>
+#include <stdexcept>
#include <set>
#include <vector>
+#include "common/cast_set.h"
#include "common/status.h"
#include "core/assert_cast.h"
+#include "exec/common/endian.h"
#include "exprs/vslot_ref.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/column_mapper.h"
#include "format/table/deletion_vector_reader.h"
#include "io/io_common.h"
+#include "roaring/roaring64map.hh"
namespace doris::reader {
namespace {
@@ -66,10 +71,63 @@ void build_table_filters_from_conjunct(const VExprSPtr&
conjunct,
table_filter.conjunct = VExprContext::create_shared(conjunct);
table_filter.slot_ids.assign(slot_ids.begin(), slot_ids.end());
table_filters->push_back(std::move(table_filter));
- return;
}
}
+Status parse_deletion_vector(const char* buf, size_t buffer_size,
DeleteFileDesc::Format format,
+ DeleteRows* delete_rows) {
+ DORIS_CHECK(buf != nullptr);
+ DORIS_CHECK(delete_rows != nullptr);
+ DORIS_CHECK(format == DeleteFileDesc::Format::PAIMON ||
+ format == DeleteFileDesc::Format::ICEBERG);
+
+ const size_t checksum_size = format == DeleteFileDesc::Format::ICEBERG ? 4
: 0;
+ if (buffer_size < 8 + checksum_size) [[unlikely]] {
+ return Status::DataQualityError("Deletion vector file size too small:
{}", buffer_size);
+ }
+
+ auto total_length = BigEndian::Load32(buf);
+ if (total_length + 4 + checksum_size != buffer_size) [[unlikely]] {
+ return Status::DataQualityError("Deletion vector length mismatch,
expected: {}, actual: {}",
+ total_length + 4 + checksum_size,
buffer_size);
+ }
+
+ constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39', '\x64'};
+ if (memcmp(buf + sizeof(total_length), MAGIC_NUMBER, 4) != 0) [[unlikely]]
{
+ return Status::DataQualityError("Deletion vector magic number
mismatch");
+ }
+
+ const char* bitmap_buf = buf + 8;
+ const size_t bitmap_size = buffer_size - 8 - checksum_size;
+ if (format == DeleteFileDesc::Format::PAIMON) {
+ roaring::Roaring bitmap;
+ try {
+ bitmap = roaring::Roaring::readSafe(bitmap_buf, bitmap_size);
+ } catch (const std::runtime_error& e) {
+ return Status::DataQualityError("Decode roaring bitmap failed,
{}", e.what());
+ }
+
+ delete_rows->reserve(bitmap.cardinality());
+ for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
+ delete_rows->push_back(*it);
+ }
+ return Status::OK();
+ }
+
+ roaring::Roaring64Map bitmap;
+ try {
+ bitmap = roaring::Roaring64Map::readSafe(bitmap_buf, bitmap_size);
+ } catch (const std::runtime_error& e) {
+ return Status::DataQualityError("Decode roaring bitmap failed, {}",
e.what());
+ }
+
+ delete_rows->reserve(bitmap.cardinality());
+ for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
+ delete_rows->push_back(cast_set<int64_t>(*it));
+ }
+ return Status::OK();
+}
+
} // namespace
std::shared_ptr<io::FileSystemProperties> create_system_properties(
@@ -117,10 +175,17 @@ Status TableReader::_open_local_filter_exprs(const
FileScanRequest& file_request
RowDescriptor row_desc;
for (const auto& expression_filter : file_request.expression_filters) {
if (expression_filter.conjunct == nullptr) {
- continue;
+ if (expression_filter.delete_conjunct == nullptr) {
+ continue;
+ }
+ } else {
+
RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state, row_desc));
+ RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
+ }
+ if (expression_filter.delete_conjunct != nullptr) {
+
RETURN_IF_ERROR(expression_filter.delete_conjunct->prepare(_runtime_state,
row_desc));
+
RETURN_IF_ERROR(expression_filter.delete_conjunct->open(_runtime_state));
}
- RETURN_IF_ERROR(expression_filter.conjunct->prepare(_runtime_state,
row_desc));
- RETURN_IF_ERROR(expression_filter.conjunct->open(_runtime_state));
}
return Status::OK();
}
@@ -169,12 +234,17 @@ Status TableReader::prepare_split(const SplitReadOptions&
options) {
_partition_values = std::move(options.partition_values);
_current_task = std::make_unique<ScanTask>();
_current_task->data_file = create_file_description(options.current_range);
+ _delete_rows = nullptr;
return _parse_delete_predicates(options);
}
Status TableReader::_parse_delete_predicates(const SplitReadOptions& options) {
DeleteFileDesc desc {.fs_name = options.current_range.fs_name};
- if (_parse_delete_file(options.current_range.table_format_params, desc)) {
+ bool has_delete_file = false;
+
RETURN_IF_ERROR(_parse_deletion_vector_file(options.current_range.table_format_params,
&desc,
+ &has_delete_file));
+ if (has_delete_file) {
+ DORIS_CHECK(options.cache != nullptr);
Status create_status = Status::OK();
_delete_rows = options.cache->get<DeleteRows>(desc.key, [&]() ->
DeleteRows* {
@@ -195,45 +265,11 @@ Status TableReader::_parse_delete_predicates(const
SplitReadOptions& options) {
}
const char* buf = buffer.data();
- uint32_t actual_length;
- std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
- std::reverse(reinterpret_cast<char*>(&actual_length),
- reinterpret_cast<char*>(&actual_length) + 4);
- buf += 4;
- if (actual_length != bytes_read - 4) [[unlikely]] {
- create_status = Status::RuntimeError(
- "DeletionVector deserialize error: length not match, "
- "actual length: {}, expect length: {}",
- actual_length, bytes_read - 4);
- return nullptr;
- }
- uint32_t magic_number;
- std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
- std::reverse(reinterpret_cast<char*>(&magic_number),
- reinterpret_cast<char*>(&magic_number) + 4);
- buf += 4;
- const static uint32_t MAGIC_NUMBER = 1581511376;
- if (magic_number != MAGIC_NUMBER) [[unlikely]] {
- create_status = Status::RuntimeError(
- "DeletionVector deserialize error: invalid magic
number {}", magic_number);
- return nullptr;
- }
-
- roaring::Roaring roaring_bitmap;
SCOPED_TIMER(_profile->parse_delete_file_time);
- try {
- roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read -
4);
- } catch (const std::runtime_error& e) {
- create_status = Status::RuntimeError(
- "DeletionVector deserialize error: failed to
deserialize roaring bitmap, "
- "{}",
- e.what());
+ create_status = parse_deletion_vector(buf, bytes_read,
desc.format, delete_rows);
+ if (!create_status.ok()) [[unlikely]] {
return nullptr;
}
- delete_rows->reserve(roaring_bitmap.cardinality());
- for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end();
it++) {
- delete_rows->push_back(*it);
- }
COUNTER_UPDATE(_profile->num_delete_rows, delete_rows->size());
return delete_rows;
});
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 5441995e18c..f94e98bd837 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -19,12 +19,14 @@
#include <bvar/status.h>
+#include <algorithm>
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
+#include "common/cast_set.h"
#include "common/status.h"
#include "core/assert_cast.h"
#include "core/block/block.h"
@@ -32,11 +34,14 @@
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_map.h"
#include "core/data_type/data_type_nullable.h"
+#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_struct.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
+#include "format/new_parquet/column_reader.h"
#include "format/reader/column_mapper.h"
#include "format/reader/expr/delete_predicate.h"
+#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
#include "runtime/descriptors.h"
@@ -179,19 +184,9 @@ public:
}
continue;
}
- DCHECK_EQ(_data_reader.block_template.columns(),
_data_reader.scan_schema.size());
-
+ DCHECK_EQ(_data_reader.block_template.columns(),
_data_reader.block_schema.size());
DORIS_CHECK(block->columns() ==
_data_reader.column_mapper.mappings().size());
- size_t idx = 0;
- for (const auto& mapping : _data_reader.column_mapper.mappings()) {
- ColumnPtr column;
- RETURN_IF_ERROR(_materialize_mapping_column(mapping,
&_data_reader.block_template,
- current_rows,
&column));
- block->replace_by_position(idx, std::move(column));
- idx++;
- }
- RETURN_IF_ERROR(finalize_chunk(block));
- RETURN_IF_ERROR(materialize_virtual_columns(block));
+ RETURN_IF_ERROR(finalize_chunk(block, current_rows));
if (current_eof) {
RETURN_IF_ERROR(close_current_reader());
}
@@ -209,9 +204,13 @@ public:
}
protected:
- virtual bool _parse_delete_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc& desc) {
- return false;
+ // Parse deletion vector information from table format specific file
description.
+ virtual Status _parse_deletion_vector_file(const TTableFormatFileDesc&
t_desc,
+ DeleteFileDesc* desc, bool*
has_delete_file) {
+ *has_delete_file = false;
+ return Status::OK();
}
+
// 切换到下一个 reader 的通用流程。
// 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
Status create_next_reader(bool* eos);
@@ -219,36 +218,45 @@ protected:
// 打开当前具体 reader。
// 子类在这里基于当前 split/task 初始化底层 FileReader。
virtual Status open_reader() {
+ // 1. Get file schema and create column mapping.
std::vector<SchemaField> file_schema;
RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
- _data_reader.block_schema = file_schema;
+ _data_reader.file_schema = file_schema;
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
_partition_values, file_schema));
DORIS_CHECK(_data_reader.column_mapper.mappings().size() ==
_projected_columns.size());
+
+ // 2. Build table filters based on conjuncts and column predicates.
RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
+ // 3. Create file scan request based on column mapping and table
filters, then open file reader with the request.
+ // file scan request is the main carrier of file-level pruning
information, including column mapping, column-level filters and expression
filters. The file reader will evaluate the filters and only return rows that
satisfy the filters to table reader.
auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
_table_filters, _table_column_predicates, _projected_columns,
file_request.get()));
RETURN_IF_ERROR(customize_file_scan_request(file_request.get()));
RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
- _data_reader.scan_schema.clear();
+ _data_reader.block_schema.clear();
_data_reader.block_template.clear();
- _data_reader.scan_schema.resize(file_request->column_positions.size());
+
_data_reader.block_schema.resize(file_request->column_positions.size());
+
+ // 4. Build block schema based on file schema and column mapping. The
scan schema describes the column layout of the block returned by file reader,
which is determined by the column mapping and file schema.
for (const auto& [file_column_id, block_position] :
file_request->column_positions) {
- DORIS_CHECK(block_position < _data_reader.scan_schema.size());
- const auto* field = _find_schema_field(_data_reader.block_schema,
file_column_id);
+ DORIS_CHECK(block_position < _data_reader.block_schema.size());
+ const auto* field = _find_schema_field(_data_reader.file_schema,
file_column_id);
DORIS_CHECK(field != nullptr);
auto projection_it =
file_request->complex_projections.find(file_column_id);
if (projection_it == file_request->complex_projections.end()) {
- _data_reader.scan_schema[block_position] = *field;
+ _data_reader.block_schema[block_position] = *field;
} else {
RETURN_IF_ERROR(_project_schema_field(*field,
projection_it->second,
-
&_data_reader.scan_schema[block_position]));
+
&_data_reader.block_schema[block_position]));
}
}
- _data_reader.block_template.reserve(_data_reader.scan_schema.size());
- for (const auto& field : _data_reader.scan_schema) {
+
+ // 5. Prepare block template based on block schema. The block template
is used to store the block returned by file reader before finalize; it has the
same column layout as the file reader output block, which is determined by the
column mapping and file schema.
+ _data_reader.block_template.reserve(_data_reader.block_schema.size());
+ for (const auto& field : _data_reader.block_schema) {
_data_reader.block_template.insert(
{field.type->create_column(), field.type, field.name});
}
@@ -261,6 +269,68 @@ protected:
Status _open_local_filter_exprs(const FileScanRequest& file_request);
virtual Status customize_file_scan_request(FileScanRequest* file_request) {
+ return _append_delete_predicate(file_request);
+ }
+
+ static size_t _next_block_position(const FileScanRequest& request) {
+ size_t next_position = 0;
+ for (const auto& [_, block_position] : request.column_positions) {
+ next_position = std::max(next_position, block_position + 1);
+ }
+ return next_position;
+ }
+
+ void _append_file_scan_column(FileScanRequest* request, ColumnId column_id,
+ std::vector<ColumnId>* scan_columns) {
+ DORIS_CHECK(request != nullptr);
+ DORIS_CHECK(scan_columns != nullptr);
+ if (scan_columns == &request->non_predicate_columns &&
+ std::find(request->predicate_columns.begin(),
request->predicate_columns.end(),
+ column_id) != request->predicate_columns.end()) {
+ return;
+ }
+ const bool newly_added = request->column_positions.count(column_id) ==
0;
+ if (newly_added) {
+ request->column_positions.emplace(column_id,
_next_block_position(*request));
+ scan_columns->push_back(column_id);
+ } else if (std::find(scan_columns->begin(), scan_columns->end(),
column_id) ==
+ scan_columns->end()) {
+ scan_columns->push_back(column_id);
+ }
+ if (scan_columns == &request->predicate_columns) {
+ request->non_predicate_columns.erase(
+ std::remove(request->non_predicate_columns.begin(),
+ request->non_predicate_columns.end(),
column_id),
+ request->non_predicate_columns.end());
+ }
+ if (column_id ==
doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID &&
+ _find_schema_field(_data_reader.file_schema, column_id) ==
nullptr) {
+ _data_reader.file_schema.push_back(
+
doris::parquet::ParquetColumnReaderFactory::row_position_schema_field());
+ }
+ }
+
+ // Append DeletePredicate to file scan request if there are deletes. The
predicate will be evaluated in file reader level and filter out deleted rows
before returning data to table reader.
+ Status _append_delete_predicate(FileScanRequest* request) {
+ DORIS_CHECK(request != nullptr);
+ if (_delete_rows == nullptr || _delete_rows->empty()) {
+ return Status::OK();
+ }
+ const auto row_position_column_id =
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+ _append_file_scan_column(request, row_position_column_id,
&request->predicate_columns);
+
+ auto delete_predicate =
std::make_shared<DeletePredicate>(*_delete_rows);
+ const auto block_position =
request->column_positions.at(row_position_column_id);
+ delete_predicate->add_child(TableSlotRef::create_shared(
+ cast_set<int>(block_position), cast_set<int>(block_position),
-1,
+ std::make_shared<DataTypeInt64>(),
+
parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME));
+
+ FileExpressionFilter delete_filter;
+ delete_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
+ delete_filter.file_column_ids.push_back(row_position_column_id);
+ request->expression_filters.push_back(std::move(delete_filter));
return Status::OK();
}
@@ -272,27 +342,32 @@ protected:
_data_reader.column_mapper.clear();
_table_filters.clear();
_table_column_predicates.clear();
+ _data_reader.file_schema.clear();
_data_reader.block_schema.clear();
- _data_reader.scan_schema.clear();
_data_reader.block_template.clear();
_current_task.reset();
return Status::OK();
}
- // 将 file-local block 转换为 table/global schema block。
- // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
- // 物化以及复杂列 remap。
- virtual Status finalize_chunk(Block* block) { return Status::OK(); }
-
- // 物化虚拟列。
- // 例如 _row_id、_last_updated_sequence_number 等,它们不来自文件物理列。
- virtual Status materialize_virtual_columns(Block* table_block) {
- // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+ // Finalize file-local block to table/global schema block.
+ virtual Status finalize_chunk(Block* block, const size_t rows) {
+ size_t idx = 0;
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ ColumnPtr column;
+ RETURN_IF_ERROR(_materialize_mapping_column(mapping,
&_data_reader.block_template, rows,
+ &column));
+ block->replace_by_position(idx, std::move(column));
+ idx++;
+ }
+ RETURN_IF_ERROR(materialize_virtual_columns(block));
return Status::OK();
}
+ // Materialize virtual columns in table block, such as _row_id and
_last_updated_sequence_number in Iceberg. This is called after finalize_chunk,
so the virtual column can be referenced in finalize_expr.
+ virtual Status materialize_virtual_columns(Block* table_block) { return
Status::OK(); }
+
Status _materialize_mapping_column(const ColumnMapping& mapping, Block*
current_block,
- size_t current_rows, ColumnPtr* column)
{
+ const size_t rows, ColumnPtr* column) {
if (mapping.projection != nullptr) {
int res_id;
RETURN_IF_ERROR(mapping.projection->execute(current_block,
&res_id));
@@ -300,23 +375,22 @@ protected:
return Status::OK();
}
if (mapping.default_expr != nullptr) {
- if (current_block->rows() == current_rows) {
+ if (current_block->rows() == rows) {
int res_id;
RETURN_IF_ERROR(mapping.default_expr->execute(current_block,
&res_id));
*column = current_block->get_columns()[res_id];
} else {
DORIS_CHECK(mapping.is_constant);
Block eval_block;
- eval_block.insert(
-
{mapping.table_type->create_column_const_with_default_value(current_rows),
- mapping.table_type, "__table_reader_const_rows"});
+
eval_block.insert({mapping.table_type->create_column_const_with_default_value(rows),
+ mapping.table_type,
"__table_reader_const_rows"});
int res_id;
RETURN_IF_ERROR(mapping.default_expr->execute(&eval_block,
&res_id));
*column = eval_block.get_columns()[res_id];
}
return Status::OK();
}
- *column =
mapping.table_type->create_column_const_with_default_value(current_rows);
+ *column =
mapping.table_type->create_column_const_with_default_value(rows);
return Status::OK();
}
@@ -338,8 +412,10 @@ protected:
struct DataReader {
std::unique_ptr<FileReader> reader;
TableColumnMapper column_mapper;
- std::vector<SchemaField> block_schema;
- std::vector<SchemaField> scan_schema;
+ std::vector<SchemaField>
+ file_schema; // Schema of the data file, also including
virtual column (row position).
+ std::vector<SchemaField>
+ block_schema; // Schema of the block returned by file reader,
determined by column mapping and file schema. It is used for file reader to
materialize columns into correct type and position.
Block block_template;
};
DataReader _data_reader;
@@ -352,8 +428,8 @@ protected:
TableColumnPredicates _table_column_predicates;
VExprContext _conjuncts {nullptr};
std::unique_ptr<ReadProfile> _profile;
- // Parsed from DELETION_VECTOR in Iceberg and Paimon
- DeleteRows* _delete_rows;
+ // Parsed from row-position based delete files, including position delete
and deletion vector.
+ DeleteRows* _delete_rows = nullptr;
TFileScanRangeParams* _scan_params;
std::shared_ptr<io::IOContext> _io_ctx;
RuntimeState* _runtime_state;
@@ -451,6 +527,7 @@ private:
return Status::OK();
}
+ // Parse row-position deletes from table format specific parameters, and
fill in _delete_rows.
Status _parse_delete_predicates(const SplitReadOptions& options);
};
diff --git a/be/src/format/table/deletion_vector_reader.h
b/be/src/format/table/deletion_vector_reader.h
index b030f048415..968344a8496 100644
--- a/be/src/format/table/deletion_vector_reader.h
+++ b/be/src/format/table/deletion_vector_reader.h
@@ -37,6 +37,11 @@ struct IOContext;
namespace doris {
struct DeleteFileDesc {
+ enum class Format {
+ PAIMON,
+ ICEBERG,
+ };
+
std::string key = "";
std::string path = "";
std::string fs_name = "";
@@ -44,6 +49,7 @@ struct DeleteFileDesc {
int64_t size = 0;
int64_t file_size = -1;
int64_t modification_time = 0;
+ Format format = Format::PAIMON;
};
class DeletionVectorReader {
diff --git a/be/src/format/table/iceberg_reader_v2.cpp
b/be/src/format/table/iceberg_reader_v2.cpp
index 220f153e93f..ad72313cc89 100644
--- a/be/src/format/table/iceberg_reader_v2.cpp
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -17,4 +17,398 @@
#include "format/table/iceberg_reader_v2.h"
-namespace doris::iceberg {} // namespace doris::iceberg
+#include <algorithm>
+#include <cstring>
+#include <memory>
+#include <utility>
+
+#include "core/assert_cast.h"
+#include "core/block/block.h"
+#include "core/column/column_const.h"
+#include "core/column/column_nullable.h"
+#include "core/column/column_string.h"
+#include "core/column/column_vector.h"
+#include "core/data_type/data_type_number.h"
+#include "core/data_type/define_primitive_type.h"
+#include "core/field.h"
+#include "format/new_parquet/column_reader.h"
+#include "format/new_parquet/parquet_reader.h"
+#include "format/reader/table_reader.h"
+#include "format/table/deletion_vector_reader.h"
+#include "io/file_factory.h"
+
+namespace doris::iceberg {
+
+IcebergTableReader::PositionDeleteBlockCollector::PositionDeleteBlockCollector(
+ std::string data_file_path, std::map<std::string, reader::DeleteRows>*
rows)
+ : _data_file_path(std::move(data_file_path)), _rows(rows) {}
+
+Status IcebergTableReader::PositionDeleteBlockCollector::collect(const Block&
block,
+ size_t
read_rows) {
+ if (read_rows == 0) {
+ return Status::OK();
+ }
+ const auto& file_path_column = assert_cast<const ColumnString&>(
+ *block.get_by_position(ICEBERG_FILE_PATH_BLOCK_POSITION).column);
+ const auto& pos_column =
+ assert_cast<const
ColumnInt64&>(*block.get_by_position(ICEBERG_ROW_POS_BLOCK_POSITION)
+ .column);
+ for (size_t row = 0; row < read_rows; ++row) {
+ const auto file_path = file_path_column.get_data_at(row).to_string();
+ if (file_path == _data_file_path) {
+ (*_rows)[file_path].push_back(pos_column.get_element(row));
+ }
+ }
+ return Status::OK();
+}
+
+Status IcebergTableReader::prepare_split(const reader::SplitReadOptions&
options) {
+ _row_lineage_columns = {};
+ _iceberg_params = nullptr;
+ _delete_predicates_initialized = false;
+ _position_delete_rows_storage.clear();
+ _equality_delete_files.clear();
+ if (options.current_range.__isset.table_format_params &&
+ options.current_range.table_format_params.__isset.iceberg_params) {
+ const auto& iceberg_params =
options.current_range.table_format_params.iceberg_params;
+ _iceberg_params = &iceberg_params;
+ if (iceberg_params.__isset.first_row_id) {
+ _row_lineage_columns.first_row_id = iceberg_params.first_row_id;
+ }
+ if (iceberg_params.__isset.last_updated_sequence_number) {
+ _row_lineage_columns.last_updated_sequence_number =
+ iceberg_params.last_updated_sequence_number;
+ }
+ }
+ RETURN_IF_ERROR(TableReader::prepare_split(options));
+ return
_collect_position_delete_rows(options.current_range.table_format_params);
+}
+
+Status IcebergTableReader::finalize_chunk(Block* block, const size_t rows) {
+ RETURN_IF_ERROR(reader::TableReader::finalize_chunk(block, rows));
+ RETURN_IF_ERROR(apply_equality_deletes(block));
+ return Status::OK();
+}
+
+Status IcebergTableReader::materialize_virtual_columns(Block* table_block) {
+ for (size_t column_idx = 0; column_idx <
_data_reader.column_mapper.mappings().size();
+ ++column_idx) {
+ const auto& mapping =
_data_reader.column_mapper.mappings()[column_idx];
+ switch (mapping.virtual_column_type) {
+ case reader::TableVirtualColumnType::ROW_ID:
+ RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block,
column_idx));
+ break;
+ case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
+ RETURN_IF_ERROR(
+
_materialize_row_lineage_last_updated_sequence_number(table_block, column_idx));
+ break;
+ case reader::TableVirtualColumnType::INVALID:
+ break;
+ }
+ }
+ return Status::OK();
+}
+
+Status
IcebergTableReader::customize_file_scan_request(reader::FileScanRequest*
file_request) {
+ RETURN_IF_ERROR(TableReader::customize_file_scan_request(file_request));
+ if (_row_lineage_columns.first_row_id >= 0 && _need_row_lineage_row_id()) {
+ RETURN_IF_ERROR(_append_row_position_output_column(file_request));
+ }
+ return Status::OK();
+}
+
+Status IcebergTableReader::_parse_deletion_vector_file(const
TTableFormatFileDesc& t_desc,
+ DeleteFileDesc* desc,
+ bool* has_delete_file) {
+ DORIS_CHECK(desc != nullptr);
+ DORIS_CHECK(has_delete_file != nullptr);
+ *has_delete_file = false;
+ if (!t_desc.__isset.iceberg_params) {
+ return Status::OK();
+ }
+ const auto& iceberg_params = t_desc.iceberg_params;
+ if (!iceberg_params.__isset.format_version ||
+ iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION ||
+ !iceberg_params.__isset.delete_files ||
iceberg_params.delete_files.empty()) {
+ return Status::OK();
+ }
+
+ const TIcebergDeleteFileDesc* deletion_vector = nullptr;
+ for (const auto& delete_file : iceberg_params.delete_files) {
+ if (!delete_file.__isset.content || delete_file.content !=
DELETION_VECTOR) {
+ continue;
+ }
+ if (deletion_vector != nullptr) {
+ return Status::DataQualityError("This iceberg data file has
multiple DVs.");
+ }
+ deletion_vector = &delete_file;
+ }
+ if (deletion_vector == nullptr) {
+ return Status::OK();
+ }
+ if (!deletion_vector->__isset.content_offset ||
+ !deletion_vector->__isset.content_size_in_bytes) {
+ return Status::InternalError("Deletion vector is missing content
offset or length");
+ }
+
+ desc->key = _iceberg_delete_vector_cache_key(*deletion_vector);
+ desc->path = deletion_vector->path;
+ desc->start_offset = deletion_vector->content_offset;
+ desc->size = deletion_vector->content_size_in_bytes;
+ desc->file_size = -1;
+ desc->format = DeleteFileDesc::Format::ICEBERG;
+ *has_delete_file = true;
+ return Status::OK();
+}
+
+Status IcebergTableReader::_collect_position_delete_rows(const
TTableFormatFileDesc& t_desc) {
+ if (!t_desc.__isset.iceberg_params || _delete_predicates_initialized) {
+ _delete_predicates_initialized = true;
+ return Status::OK();
+ }
+ const auto& iceberg_params = t_desc.iceberg_params;
+ if (!iceberg_params.__isset.format_version ||
+ iceberg_params.format_version < MIN_SUPPORT_DELETE_FILES_VERSION ||
+ !iceberg_params.__isset.delete_files ||
iceberg_params.delete_files.empty()) {
+ _delete_predicates_initialized = true;
+ return Status::OK();
+ }
+
+ std::vector<TIcebergDeleteFileDesc> position_delete_files;
+ for (const auto& delete_file : iceberg_params.delete_files) {
+ if (!delete_file.__isset.content) {
+ continue;
+ }
+ if (delete_file.content == POSITION_DELETE) {
+ position_delete_files.push_back(delete_file);
+ } else if (delete_file.content == EQUALITY_DELETE) {
+ _equality_delete_files.push_back(delete_file);
+ }
+ }
+
+ if (_delete_rows != nullptr) {
+ _position_delete_rows_storage = *_delete_rows;
+ _delete_rows = &_position_delete_rows_storage;
+ }
+ if (!position_delete_files.empty()) {
+ RETURN_IF_ERROR(_read_position_delete_files(position_delete_files));
+ }
+
+ _delete_predicates_initialized = true;
+ return Status::OK();
+}
+
+Status IcebergTableReader::apply_equality_deletes(Block* block) {
+ if (!_equality_delete_files.empty()) {
+ return Status::NotSupported("Iceberg equality delete is not supported
by TableReader");
+ }
+ return Status::OK();
+}
+
+std::string IcebergTableReader::_iceberg_delete_vector_cache_key(
+ const TIcebergDeleteFileDesc& delete_file) {
+ const std::string key_prefix = "iceberg_dv:";
+ std::string key;
+ key.resize(key_prefix.size() + delete_file.path.size() +
sizeof(delete_file.content_offset) +
+ sizeof(delete_file.content_size_in_bytes));
+ char* data = key.data();
+ memcpy(data, key_prefix.data(), key_prefix.size());
+ data += key_prefix.size();
+ memcpy(data, delete_file.path.data(), delete_file.path.size());
+ data += delete_file.path.size();
+ memcpy(data, &delete_file.content_offset,
sizeof(delete_file.content_offset));
+ data += sizeof(delete_file.content_offset);
+ memcpy(data, &delete_file.content_size_in_bytes,
sizeof(delete_file.content_size_in_bytes));
+ return key;
+}
+
+std::shared_ptr<io::FileSystemProperties>
IcebergTableReader::_delete_file_system_properties(
+ const TFileScanRangeParams& scan_params) {
+ auto system_properties = std::make_shared<io::FileSystemProperties>();
+ system_properties->system_type =
+ scan_params.__isset.file_type ? scan_params.file_type :
TFileType::FILE_LOCAL;
+ system_properties->properties = scan_params.properties;
+ system_properties->hdfs_params = scan_params.hdfs_params;
+ if (scan_params.__isset.broker_addresses) {
+
system_properties->broker_addresses.assign(scan_params.broker_addresses.begin(),
+
scan_params.broker_addresses.end());
+ }
+ return system_properties;
+}
+
+std::unique_ptr<io::FileDescription>
IcebergTableReader::_delete_file_description(
+ const TFileRangeDesc& range) {
+ auto file_description = std::make_unique<io::FileDescription>();
+ file_description->path = range.path;
+ file_description->file_size = range.__isset.file_size ? range.file_size :
-1;
+ file_description->range_start_offset = range.__isset.start_offset ?
range.start_offset : 0;
+ file_description->range_size = range.__isset.size ? range.size : -1;
+ if (range.__isset.fs_name) {
+ file_description->fs_name = range.fs_name;
+ }
+ return file_description;
+}
+
+const reader::SchemaField* IcebergTableReader::_find_delete_field(
+ const std::vector<reader::SchemaField>& schema, const std::string&
name) {
+ for (const auto& field : schema) {
+ if (field.name == name) {
+ return &field;
+ }
+ }
+ return nullptr;
+}
+
+Block IcebergTableReader::_build_position_delete_block(const
reader::SchemaField& file_path_field,
+ const
reader::SchemaField& pos_field) {
+ Block block;
+ block.insert({file_path_field.type->create_column(), file_path_field.type,
ICEBERG_FILE_PATH});
+ block.insert({pos_field.type->create_column(), pos_field.type,
ICEBERG_ROW_POS});
+ return block;
+}
+
+Status
IcebergTableReader::_append_row_position_output_column(reader::FileScanRequest*
request) {
+ const auto row_position_column_id =
+ doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+ _append_file_scan_column(request, row_position_column_id,
&request->non_predicate_columns);
+ _row_position_block_position =
request->column_positions.at(row_position_column_id);
+ return Status::OK();
+}
+
+std::string IcebergTableReader::_data_file_path() const {
+ if (_iceberg_params != nullptr &&
_iceberg_params->__isset.original_file_path) {
+ return _iceberg_params->original_file_path;
+ }
+ DORIS_CHECK(_current_task != nullptr);
+ DORIS_CHECK(_current_task->data_file != nullptr);
+ return _current_task->data_file->path;
+}
+
+Status IcebergTableReader::_read_parquet_position_delete_file(
+ const TIcebergDeleteFileDesc& delete_file, const TFileScanRangeParams&
scan_params,
+ IcebergDeleteFileIOContext* delete_io_ctx,
PositionDeleteBlockCollector* collector) {
+ if (!delete_file.__isset.file_format) {
+ return Status::InternalError("Iceberg position delete file is missing
file format");
+ }
+ if (delete_file.file_format == TFileFormatType::FORMAT_ORC) {
+ return Status::NotSupported("Iceberg ORC position delete file is not
supported");
+ }
+ if (delete_file.file_format != TFileFormatType::FORMAT_PARQUET) {
+ return Status::NotSupported("Unsupported Iceberg delete file format
{}",
+ delete_file.file_format);
+ }
+
+ auto delete_range = build_iceberg_delete_file_range(delete_file.path);
+ if (_current_task != nullptr && _current_task->data_file != nullptr &&
+ !_current_task->data_file->fs_name.empty()) {
+ delete_range.__set_fs_name(_current_task->data_file->fs_name);
+ }
+ auto system_properties = _delete_file_system_properties(scan_params);
+ auto file_description = _delete_file_description(delete_range);
+ std::shared_ptr<io::IOContext> io_ctx(&delete_io_ctx->io_ctx,
[](io::IOContext*) {});
+ parquet::ParquetReader reader(system_properties, file_description, io_ctx,
_scanner_profile);
+ RETURN_IF_ERROR(reader.init(_runtime_state));
+
+ std::vector<reader::SchemaField> schema;
+ RETURN_IF_ERROR(reader.get_schema(&schema));
+ const auto* file_path_field = _find_delete_field(schema,
ICEBERG_FILE_PATH);
+ const auto* pos_field = _find_delete_field(schema, ICEBERG_ROW_POS);
+ if (file_path_field == nullptr || pos_field == nullptr) {
+ return Status::InternalError("Position delete parquet file is missing
required columns");
+ }
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->non_predicate_columns = {file_path_field->id, pos_field->id};
+ request->column_positions = {
+ {file_path_field->id, ICEBERG_FILE_PATH_BLOCK_POSITION},
+ {pos_field->id, ICEBERG_ROW_POS_BLOCK_POSITION},
+ };
+ RETURN_IF_ERROR(reader.open(request));
+
+ bool eof = false;
+ while (!eof) {
+ Block block = _build_position_delete_block(*file_path_field,
*pos_field);
+ size_t read_rows = 0;
+ RETURN_IF_ERROR(reader.get_block(&block, &read_rows, &eof));
+ RETURN_IF_ERROR(collector->collect(block, read_rows));
+ }
+ return reader.close();
+}
+
+Status IcebergTableReader::_read_position_delete_files(
+ const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ TFileScanRangeParams delete_scan_params =
+ _scan_params == nullptr ? TFileScanRangeParams() : *_scan_params;
+ std::map<std::string, reader::DeleteRows> rows_by_file;
+ const auto data_file_path = _data_file_path();
+ IcebergDeleteFileIOContext delete_io_ctx(_runtime_state);
+ PositionDeleteBlockCollector collector(data_file_path, &rows_by_file);
+ for (const auto& delete_file : delete_files) {
+ RETURN_IF_ERROR(_read_parquet_position_delete_file(delete_file,
delete_scan_params,
+ &delete_io_ctx,
&collector));
+ }
+ auto rows_it = rows_by_file.find(data_file_path);
+ if (rows_it == rows_by_file.end()) {
+ return Status::OK();
+ }
+ // Position delete files and deletion vectors both become row-position
deletes for the
+ // common TableReader DeletePredicate path. Keep the merged rows in a
member vector because
+ // DeletePredicate stores a reference to the vector used by _delete_rows.
+ _position_delete_rows_storage.insert(_position_delete_rows_storage.end(),
+ rows_it->second.begin(),
rows_it->second.end());
+ std::sort(_position_delete_rows_storage.begin(),
_position_delete_rows_storage.end());
+
_position_delete_rows_storage.erase(std::unique(_position_delete_rows_storage.begin(),
+
_position_delete_rows_storage.end()),
+ _position_delete_rows_storage.end());
+ _delete_rows = &_position_delete_rows_storage;
+ return Status::OK();
+}
+
+Status IcebergTableReader::_materialize_row_lineage_row_id(Block* table_block,
size_t column_idx) {
+ if (_row_lineage_columns.first_row_id < 0) {
+ return Status::OK();
+ }
+ DORIS_CHECK(_row_position_block_position <
_data_reader.block_template.columns());
+ const auto& row_position_column = assert_cast<const ColumnInt64&>(
+
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
+ DORIS_CHECK(row_position_column.size() == table_block->rows());
+ auto column =
+
table_block->get_by_position(column_idx).column->convert_to_full_column_if_const()
+ ->assume_mutable();
+ auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
+ auto& null_map = nullable_column->get_null_map_data();
+ auto& data =
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
+ null_map.resize(row_position_column.size());
+ std::fill(null_map.begin(), null_map.end(), 0);
+ data.resize(row_position_column.size());
+ for (size_t row = 0; row < row_position_column.size(); ++row) {
+ data[row] = _row_lineage_columns.first_row_id +
row_position_column.get_element(row);
+ }
+ table_block->replace_by_position(column_idx, std::move(column));
+ return Status::OK();
+}
+
+Status
IcebergTableReader::_materialize_row_lineage_last_updated_sequence_number(
+ Block* table_block, size_t column_idx) {
+ if (_row_lineage_columns.last_updated_sequence_number < 0) {
+ return Status::OK();
+ }
+ const auto rows = table_block->rows();
+ auto data_column =
table_block->get_by_position(column_idx).type->create_column();
+ data_column->insert(
+
Field::create_field<TYPE_BIGINT>(_row_lineage_columns.last_updated_sequence_number));
+ auto column = ColumnConst::create(std::move(data_column), rows);
+ table_block->replace_by_position(column_idx, std::move(column));
+ return Status::OK();
+}
+
+bool IcebergTableReader::_need_row_lineage_row_id() const {
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ if (mapping.virtual_column_type ==
reader::TableVirtualColumnType::ROW_ID) {
+ return true;
+ }
+ }
+ return false;
+}
+
+} // namespace doris::iceberg
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index 6c6f4416717..fbc8e28441b 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -19,26 +19,24 @@
#include <cstddef>
#include <cstdint>
+#include <map>
#include <memory>
#include <string>
-#include <utility>
#include <vector>
#include "common/status.h"
-#include "core/assert_cast.h"
-#include "core/block/block.h"
-#include "core/column/column_const.h"
-#include "core/column/column_nullable.h"
-#include "core/column/column_vector.h"
-#include "core/data_type/define_primitive_type.h"
-#include "core/field.h"
-#include "format/new_parquet/column_reader.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"
+#include "format/table/iceberg_delete_file_reader_helper.h"
#include "gen_cpp/PlanNodes_types.h"
namespace doris {
class Block;
+struct DeleteFileDesc;
+namespace io {
+struct FileDescription;
+struct FileSystemProperties;
+} // namespace io
} // namespace doris
namespace doris::iceberg {
@@ -50,144 +48,92 @@ class IcebergTableReader : public reader::TableReader {
public:
~IcebergTableReader() override = default;
- Status prepare_split(const reader::SplitReadOptions& options) override {
- _row_lineage_columns = {};
- if (options.current_range.__isset.table_format_params &&
- options.current_range.table_format_params.__isset.iceberg_params) {
- const auto& iceberg_params =
options.current_range.table_format_params.iceberg_params;
- if (iceberg_params.__isset.first_row_id) {
- _row_lineage_columns.first_row_id =
iceberg_params.first_row_id;
- }
- if (iceberg_params.__isset.last_updated_sequence_number) {
- _row_lineage_columns.last_updated_sequence_number =
- iceberg_params.last_updated_sequence_number;
- }
- }
- return TableReader::prepare_split(options);
- }
+ Status prepare_split(const reader::SplitReadOptions& options) override;
protected:
// 将 file-local block 转换为 table/global schema block。
// 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
// 物化以及复杂列 remap。
- Status finalize_chunk(Block* block) override {
- // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated
- // expressions,把 file-local block 写成 table block。
- RETURN_IF_ERROR(apply_equality_deletes(block));
- return Status::OK();
- }
-
- // 物化 Iceberg 虚拟列。
- // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
- Status materialize_virtual_columns(Block* table_block) override {
- for (size_t column_idx = 0; column_idx <
_data_reader.column_mapper.mappings().size();
- ++column_idx) {
- const auto& mapping =
_data_reader.column_mapper.mappings()[column_idx];
- switch (mapping.virtual_column_type) {
- case reader::TableVirtualColumnType::ROW_ID:
- RETURN_IF_ERROR(_materialize_row_lineage_row_id(table_block,
column_idx));
- break;
- case reader::TableVirtualColumnType::LAST_UPDATED_SEQUENCE_NUMBER:
-
RETURN_IF_ERROR(_materialize_row_lineage_last_updated_sequence_number(table_block,
-
column_idx));
- break;
- case reader::TableVirtualColumnType::INVALID:
- break;
- }
- }
- return Status::OK();
- }
-
- // 将 Iceberg position delete / deletion vector 转换成底层 reader 可消费的删除信息。
- // 这一步发生在读取 data file 前,因此会修改 FileScanRequest。
- Status apply_position_deletes(reader::FileScanRequest* request) {
- // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。
- (void)request;
- return Status::OK();
- }
-
- Status customize_file_scan_request(reader::FileScanRequest* file_request)
override {
- if (_row_lineage_columns.first_row_id < 0 ||
!_need_row_lineage_row_id()) {
- return Status::OK();
- }
- DORIS_CHECK(file_request != nullptr);
- const auto row_position_column_id =
-
doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
- if (file_request->column_positions.count(row_position_column_id) > 0) {
- return Status::OK();
- }
- _row_position_block_position = file_request->column_positions.size();
- file_request->non_predicate_columns.push_back(row_position_column_id);
- file_request->column_positions.emplace(row_position_column_id,
- _row_position_block_position);
- _data_reader.block_schema.push_back(
-
doris::parquet::ParquetColumnReaderFactory::row_position_schema_field());
- return Status::OK();
- }
+ Status finalize_chunk(Block* block, const size_t rows) override;
+
+ Status materialize_virtual_columns(Block* table_block) override;
+
+ Status customize_file_scan_request(reader::FileScanRequest* file_request)
override;
+
+ Status _parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
+ bool* has_delete_file) override;
+
+ Status _collect_position_delete_rows(const TTableFormatFileDesc& t_desc);
// 在 table block 上应用 equality delete。
// equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
- Status apply_equality_deletes(Block* block) {
- // 真实实现会在 table block 上应用 equality delete。
- return Status::OK();
- }
+ Status apply_equality_deletes(Block* block);
private:
+ static constexpr int MIN_SUPPORT_DELETE_FILES_VERSION = 2;
+ static constexpr int POSITION_DELETE = 1;
+ static constexpr int EQUALITY_DELETE = 2;
+ static constexpr int DELETION_VECTOR = 3;
+
struct RowLineageColumns {
int64_t first_row_id = -1;
int64_t last_updated_sequence_number = -1;
};
- Status _materialize_row_lineage_row_id(Block* table_block, size_t
column_idx) {
- if (_row_lineage_columns.first_row_id < 0) {
- return Status::OK();
- }
- DORIS_CHECK(_row_position_block_position <
_data_reader.block_template.columns());
- const auto& row_position_column = assert_cast<const ColumnInt64&>(
-
*_data_reader.block_template.get_by_position(_row_position_block_position).column);
- DORIS_CHECK(row_position_column.size() == table_block->rows());
- auto column = table_block->get_by_position(column_idx)
- .column->convert_to_full_column_if_const()
- ->assume_mutable();
- auto* nullable_column = assert_cast<ColumnNullable*>(column.get());
- auto& null_map = nullable_column->get_null_map_data();
- auto& data =
-
assert_cast<ColumnInt64&>(*nullable_column->get_nested_column_ptr()).get_data();
- null_map.resize(row_position_column.size());
- std::fill(null_map.begin(), null_map.end(), 0);
- data.resize(row_position_column.size());
- for (size_t row = 0; row < row_position_column.size(); ++row) {
- data[row] = _row_lineage_columns.first_row_id +
row_position_column.get_element(row);
- }
- table_block->replace_by_position(column_idx, std::move(column));
- return Status::OK();
- }
+ static constexpr const char* ICEBERG_FILE_PATH = "file_path";
+ static constexpr const char* ICEBERG_ROW_POS = "pos";
+ static constexpr size_t ICEBERG_FILE_PATH_BLOCK_POSITION = 0;
+ static constexpr size_t ICEBERG_ROW_POS_BLOCK_POSITION = 1;
+
+ class PositionDeleteBlockCollector final {
+ public:
+ PositionDeleteBlockCollector(std::string data_file_path,
+ std::map<std::string,
reader::DeleteRows>* rows);
+
+ Status collect(const Block& block, size_t read_rows);
+
+ private:
+ std::string _data_file_path;
+ std::map<std::string, reader::DeleteRows>* _rows = nullptr;
+ };
+
+ static std::string _iceberg_delete_vector_cache_key(const
TIcebergDeleteFileDesc& delete_file);
+
+ static std::shared_ptr<io::FileSystemProperties>
_delete_file_system_properties(
+ const TFileScanRangeParams& scan_params);
+
+ static std::unique_ptr<io::FileDescription> _delete_file_description(const
TFileRangeDesc& range);
+
+ static const reader::SchemaField* _find_delete_field(
+ const std::vector<reader::SchemaField>& schema, const std::string&
name);
+
+ static Block _build_position_delete_block(const reader::SchemaField&
file_path_field,
+ const reader::SchemaField&
pos_field);
+
+ Status _append_row_position_output_column(reader::FileScanRequest*
request);
+
+ std::string _data_file_path() const;
+
+ Status _read_parquet_position_delete_file(const TIcebergDeleteFileDesc&
delete_file,
+ const TFileScanRangeParams&
scan_params,
+ IcebergDeleteFileIOContext*
delete_io_ctx,
+ PositionDeleteBlockCollector*
collector);
+
+ Status _read_position_delete_files(const
std::vector<TIcebergDeleteFileDesc>& delete_files);
+
+ Status _materialize_row_lineage_row_id(Block* table_block, size_t
column_idx);
Status _materialize_row_lineage_last_updated_sequence_number(Block*
table_block,
- size_t
column_idx) {
- if (_row_lineage_columns.last_updated_sequence_number < 0) {
- return Status::OK();
- }
- const auto rows = table_block->rows();
- auto data_column =
table_block->get_by_position(column_idx).type->create_column();
- data_column->insert(Field::create_field<TYPE_BIGINT>(
- _row_lineage_columns.last_updated_sequence_number));
- auto column = ColumnConst::create(std::move(data_column), rows);
- table_block->replace_by_position(column_idx, std::move(column));
- return Status::OK();
- }
+ size_t
column_idx);
RowLineageColumns _row_lineage_columns;
size_t _row_position_block_position = 0;
+ const TIcebergFileDesc* _iceberg_params = nullptr;
+ bool _delete_predicates_initialized = false;
+ reader::DeleteRows _position_delete_rows_storage;
+ std::vector<TIcebergDeleteFileDesc> _equality_delete_files;
- bool _need_row_lineage_row_id() const {
- for (const auto& mapping : _data_reader.column_mapper.mappings()) {
- if (mapping.virtual_column_type ==
reader::TableVirtualColumnType::ROW_ID) {
- return true;
- }
- }
- return false;
- }
+ bool _need_row_lineage_row_id() const;
};
} // namespace doris::iceberg
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 00938482d6c..0be12c27129 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -42,6 +42,8 @@
#include "exprs/vexpr_context.h"
#include "format/new_parquet/column_reader.h"
#include "format/reader/column_mapper.h"
+#include "format/reader/expr/delete_predicate.h"
+#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
#include "format/reader/table_reader.h"
#include "gen_cpp/Types_types.h"
@@ -655,6 +657,98 @@ TEST_F(NewParquetReaderTest,
RowPositionReaderKeepsPositionsAfterSelection) {
EXPECT_EQ(row_position_column.get_element(2), 4);
}
+TEST_F(NewParquetReaderTest, DeletePredicateFiltersRowPositions) {
+ auto reader = create_reader();
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ Block block = build_file_block_with_row_position(schema);
+
+ static const std::vector<int64_t> deleted_rows {1, 3};
+ auto delete_predicate = std::make_shared<DeletePredicate>(deleted_rows);
+ delete_predicate->add_child(TableSlotRef::create_shared(
+ 2, 2, -1, std::make_shared<DataTypeInt64>(),
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME));
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->predicate_columns =
{parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID};
+ request->non_predicate_columns = {0};
+ request->column_positions = {
+ {0, 0},
+ {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+ };
+ reader::FileExpressionFilter delete_filter;
+ delete_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
+ delete_filter.file_column_ids.push_back(
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
+ request->expression_filters.push_back(std::move(delete_filter));
+ ASSERT_TRUE(reader->open(request).ok());
+
+ size_t rows = 0;
+ bool eof = false;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ EXPECT_FALSE(eof);
+ ASSERT_EQ(rows, 3);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& row_position_column =
+ assert_cast<const ColumnInt64&>(*block.get_by_position(2).column);
+ EXPECT_EQ(id_column.get_element(0), 1);
+ EXPECT_EQ(id_column.get_element(1), 3);
+ EXPECT_EQ(id_column.get_element(2), 5);
+ EXPECT_EQ(row_position_column.get_element(0), 0);
+ EXPECT_EQ(row_position_column.get_element(1), 2);
+ EXPECT_EQ(row_position_column.get_element(2), 4);
+}
+
+TEST_F(NewParquetReaderTest,
QueryPredicateAndDeletePredicateFilterRowPositions) {
+ auto reader = create_reader();
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ ASSERT_TRUE(reader->init(&state).ok());
+
+ std::vector<reader::SchemaField> schema;
+ ASSERT_TRUE(reader->get_schema(&schema).ok());
+ Block block = build_file_block_with_row_position(schema);
+
+ static const std::vector<int64_t> deleted_rows {3};
+ auto delete_predicate = std::make_shared<DeletePredicate>(deleted_rows);
+ delete_predicate->add_child(TableSlotRef::create_shared(
+ 2, 2, -1, std::make_shared<DataTypeInt64>(),
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_NAME));
+
+ auto request = std::make_unique<reader::FileScanRequest>();
+ request->predicate_columns = {0,
parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID};
+ request->non_predicate_columns = {};
+ request->column_positions = {
+ {0, 0},
+ {parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID, 2},
+ };
+ reader::FileExpressionFilter expression_filter;
+ expression_filter.conjunct = create_int32_greater_than_conjunct(0, 2);
+ expression_filter.delete_conjunct =
VExprContext::create_shared(std::move(delete_predicate));
+ expression_filter.file_column_ids.push_back(0);
+ expression_filter.file_column_ids.push_back(
+ parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID);
+ request->expression_filters.push_back(std::move(expression_filter));
+ ASSERT_TRUE(reader->open(request).ok());
+
+ size_t rows = 0;
+ bool eof = false;
+ ASSERT_TRUE(reader->get_block(&block, &rows, &eof).ok());
+ EXPECT_FALSE(eof);
+ ASSERT_EQ(rows, 2);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ const auto& row_position_column =
+ assert_cast<const ColumnInt64&>(*block.get_by_position(2).column);
+ EXPECT_EQ(id_column.get_element(0), 3);
+ EXPECT_EQ(id_column.get_element(1), 5);
+ EXPECT_EQ(row_position_column.get_element(0), 2);
+ EXPECT_EQ(row_position_column.get_element(1), 4);
+}
+
TEST_F(NewParquetReaderTest,
RowPositionReaderUsesFileLocalPositionsForScanRange) {
write_parquet_file(_file_path, 2);
auto parquet_file_reader =
::parquet::ParquetFileReader::OpenFile(_file_path, false);
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index dc050976836..8705775485f 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -25,6 +25,7 @@
#include <algorithm>
#include <filesystem>
+#include <fstream>
#include <memory>
#include <string>
#include <vector>
@@ -37,10 +38,16 @@
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
+#include "exec/common/endian.h"
#include "exprs/vexpr.h"
+#include "format/format_common.h"
#include "format/reader/expr/slot_ref.h"
+#include "format/table/deletion_vector_reader.h"
#include "format/table/iceberg_reader_v2.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "io/io_common.h"
+#include "roaring/roaring64map.hh"
+#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "storage/predicate/predicate_creator.h"
@@ -80,6 +87,58 @@ private:
const std::string _expr_name = "TableInt32GreaterThanExpr";
};
+class IcebergTableReaderDeleteFileTestHelper final : public
doris::iceberg::IcebergTableReader {
+public:
+ Status parse_deletion_vector_file(const TTableFormatFileDesc& t_desc,
DeleteFileDesc* desc,
+ bool* has_delete_file) {
+ return _parse_deletion_vector_file(t_desc, desc, has_delete_file);
+ }
+};
+
+class IcebergTableReaderScanRequestTestHelper final : public
doris::iceberg::IcebergTableReader {
+public:
+ Status init_for_scan_request_test(std::vector<TableColumn>
projected_columns) {
+ _query_options = std::make_unique<TQueryOptions>();
+ _query_globals = std::make_unique<TQueryGlobals>();
+ _state = std::make_unique<RuntimeState>(*_query_options,
*_query_globals);
+ RETURN_IF_ERROR(init({
+ .projected_columns = std::move(projected_columns),
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = _state.get(),
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ }));
+
+ SplitReadOptions split_options;
+ split_options.current_range.__set_path("scan-request-test.parquet");
+ TTableFormatFileDesc table_format_params;
+ TIcebergFileDesc iceberg_params;
+ iceberg_params.__set_first_row_id(1000);
+ table_format_params.__set_iceberg_params(iceberg_params);
+
split_options.current_range.__set_table_format_params(table_format_params);
+ RETURN_IF_ERROR(prepare_split(split_options));
+
+ _delete_rows_storage = {1};
+ _delete_rows = &_delete_rows_storage;
+ return Status::OK();
+ }
+
+ Status customize_request(FileScanRequest* request) {
+ return customize_file_scan_request(request);
+ }
+
+private:
+ std::unique_ptr<TQueryOptions> _query_options;
+ std::unique_ptr<TQueryGlobals> _query_globals;
+ std::unique_ptr<RuntimeState> _state;
+ DeleteRows _delete_rows_storage;
+};
+
class TableInt32SumGreaterThanExpr final : public VExpr {
public:
TableInt32SumGreaterThanExpr(int left_slot_id, int left_column_id, int
right_slot_id,
@@ -174,6 +233,14 @@ std::shared_ptr<arrow::Array> build_int32_array(const
std::vector<int32_t>& valu
return finish_array(&builder);
}
+std::shared_ptr<arrow::Array> build_int64_array(const std::vector<int64_t>&
values) {
+ arrow::Int64Builder builder;
+ for (const auto value : values) {
+ EXPECT_TRUE(builder.Append(value).ok());
+ }
+ return finish_array(&builder);
+}
+
std::shared_ptr<arrow::Array> build_string_array(const
std::vector<std::string>& values) {
arrow::StringBuilder builder;
for (const auto& value : values) {
@@ -227,6 +294,54 @@ void write_int_pair_parquet_file(const std::string&
file_path, const std::vector
write_row_group_size,
builder.build()));
}
+void write_position_delete_parquet_file(const std::string& file_path,
+ const std::vector<std::string>&
data_file_paths,
+ const std::vector<int64_t>& positions)
{
+ auto schema = arrow::schema({
+ arrow::field("file_path", arrow::utf8(), false),
+ arrow::field("pos", arrow::int64(), false),
+ });
+ auto table = arrow::Table::Make(schema,
+ {build_string_array(data_file_paths),
+ build_int64_array(positions)});
+
+ auto file_result = arrow::io::FileOutputStream::Open(file_path);
+ ASSERT_TRUE(file_result.ok()) << file_result.status();
+ std::shared_ptr<arrow::io::FileOutputStream> out = *file_result;
+
+ ::parquet::WriterProperties::Builder builder;
+ builder.version(::parquet::ParquetVersion::PARQUET_2_6);
+ builder.data_page_version(::parquet::ParquetDataPageVersion::V2);
+ builder.compression(::parquet::Compression::UNCOMPRESSED);
+ PARQUET_THROW_NOT_OK(::parquet::arrow::WriteTable(
+ *table, arrow::default_memory_pool(), out,
static_cast<int64_t>(positions.size()),
+ builder.build()));
+}
+
+int64_t write_iceberg_deletion_vector_file(const std::string& file_path,
+ const std::vector<uint64_t>&
deleted_positions) {
+ roaring::Roaring64Map rows;
+ for (const auto position : deleted_positions) {
+ rows.add(position);
+ }
+
+ const size_t bitmap_size = rows.getSizeInBytes();
+ std::vector<char> blob(4 + 4 + bitmap_size + 4);
+ rows.write(blob.data() + 8);
+
+ const uint32_t total_length = static_cast<uint32_t>(4 + bitmap_size);
+ BigEndian::Store32(blob.data(), total_length);
+ constexpr char DV_MAGIC[] = {'\xD1', '\xD3', '\x39', '\x64'};
+ memcpy(blob.data() + 4, DV_MAGIC, 4);
+ BigEndian::Store32(blob.data() + 8 + bitmap_size, 0);
+
+ std::ofstream output(file_path, std::ios::binary);
+ EXPECT_TRUE(output.is_open());
+ output.write(blob.data(), static_cast<std::streamsize>(blob.size()));
+ EXPECT_TRUE(output.good());
+ return static_cast<int64_t>(blob.size());
+}
+
Block build_table_block(const std::vector<TableColumn>& columns) {
Block block;
for (const auto& column : columns) {
@@ -266,6 +381,81 @@ void set_iceberg_row_lineage_params(SplitReadOptions*
split_options, int64_t fir
split_options->current_range.__set_table_format_params(table_format_params);
}
+TIcebergDeleteFileDesc make_iceberg_deletion_vector(const std::string& path,
int64_t offset,
+ int64_t size) {
+ TIcebergDeleteFileDesc delete_file;
+ delete_file.__set_content(3);
+ delete_file.__set_path(path);
+ delete_file.__set_content_offset(offset);
+ delete_file.__set_content_size_in_bytes(size);
+ return delete_file;
+}
+
+TIcebergDeleteFileDesc make_iceberg_position_delete_file(const std::string&
path) {
+ TIcebergDeleteFileDesc delete_file;
+ delete_file.__set_content(1);
+ delete_file.__set_path(path);
+ delete_file.__set_file_format(TFileFormatType::FORMAT_PARQUET);
+ return delete_file;
+}
+
+TFileScanRangeParams make_local_parquet_scan_params() {
+ TFileScanRangeParams scan_params;
+ scan_params.__set_file_type(TFileType::FILE_LOCAL);
+ scan_params.__set_format_type(TFileFormatType::FORMAT_PARQUET);
+ return scan_params;
+}
+
+std::shared_ptr<io::IOContext> make_io_context(io::FileReaderStats*
file_reader_stats,
+ io::FileCacheStatistics*
file_cache_stats) {
+ auto io_ctx = std::make_shared<io::IOContext>();
+ io_ctx->file_reader_stats = file_reader_stats;
+ io_ctx->file_cache_stats = file_cache_stats;
+ return io_ctx;
+}
+
+std::unique_ptr<ReadProfile> make_table_read_profile(RuntimeProfile* profile) {
+ auto read_profile = std::make_unique<ReadProfile>();
+ read_profile->num_delete_files = ADD_COUNTER(profile, "NumDeleteFiles",
TUnit::UNIT);
+ read_profile->num_delete_rows = ADD_COUNTER(profile, "NumDeleteRows",
TUnit::UNIT);
+ read_profile->parse_delete_file_time = ADD_TIMER(profile,
"ParseDeleteFileTime");
+ return read_profile;
+}
+
+TTableFormatFileDesc make_iceberg_table_format_desc(
+ const std::string& data_file_path, const
std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ TTableFormatFileDesc table_format_params;
+ TIcebergFileDesc iceberg_params;
+ iceberg_params.__set_format_version(2);
+ iceberg_params.__set_original_file_path(data_file_path);
+ iceberg_params.__set_delete_files(delete_files);
+ table_format_params.__set_iceberg_params(iceberg_params);
+ return table_format_params;
+}
+
+std::vector<int32_t> read_iceberg_ids(
+ doris::iceberg::IcebergTableReader* reader,
+ const std::vector<TableColumn>& projected_columns) {
+ std::vector<int32_t> ids;
+ bool eos = false;
+ while (!eos) {
+ Block block = build_table_block(projected_columns);
+ auto status = reader->get_block(&block, &eos);
+ if (!status.ok()) {
+ ADD_FAILURE() << status;
+ return ids;
+ }
+ if (block.rows() == 0) {
+ continue;
+ }
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ for (size_t row = 0; row < block.rows(); ++row) {
+ ids.push_back(id_column.get_element(row));
+ }
+ }
+ return ids;
+}
+
int64_t parquet_column_start_offset(const ::parquet::ColumnChunkMetaData&
column_metadata) {
return column_metadata.has_dictionary_page()
?
static_cast<int64_t>(column_metadata.dictionary_page_offset())
@@ -936,6 +1126,217 @@ TEST(TableReaderTest,
IcebergVirtualColumnsKeepRowLineageAfterRowGroupPredicateP
std::filesystem::remove_all(test_dir);
}
+TEST(TableReaderTest, IcebergDeletionVectorUsesTableReaderDeleteFileInterface)
{
+ TTableFormatFileDesc table_format_desc;
+ TIcebergFileDesc iceberg_desc;
+ iceberg_desc.__set_format_version(2);
+ iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv.bin", 8,
128)});
+ table_format_desc.__set_iceberg_params(iceberg_desc);
+
+ IcebergTableReaderDeleteFileTestHelper reader;
+ DeleteFileDesc desc;
+ bool has_delete_file = false;
+ ASSERT_TRUE(reader.parse_deletion_vector_file(table_format_desc, &desc,
&has_delete_file).ok());
+
+ EXPECT_TRUE(has_delete_file);
+ EXPECT_EQ(desc.path, "dv.bin");
+ EXPECT_EQ(desc.start_offset, 8);
+ EXPECT_EQ(desc.size, 128);
+ EXPECT_EQ(desc.file_size, -1);
+ EXPECT_EQ(desc.format, DeleteFileDesc::Format::ICEBERG);
+}
+
+TEST(TableReaderTest, IcebergDeletionVectorRejectsMultipleDeleteFiles) {
+ TTableFormatFileDesc table_format_desc;
+ TIcebergFileDesc iceberg_desc;
+ iceberg_desc.__set_format_version(2);
+ iceberg_desc.__set_delete_files({make_iceberg_deletion_vector("dv-a.bin",
8, 128),
+ make_iceberg_deletion_vector("dv-b.bin",
16, 256)});
+ table_format_desc.__set_iceberg_params(iceberg_desc);
+
+ IcebergTableReaderDeleteFileTestHelper reader;
+ DeleteFileDesc desc;
+ bool has_delete_file = false;
+ auto status = reader.parse_deletion_vector_file(table_format_desc, &desc,
&has_delete_file);
+
+ EXPECT_FALSE(status.ok());
+}
+
+TEST(TableReaderTest, IcebergTableReaderAppliesDeletionVectorFile) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_deletion_vector_file_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto dv_path = (test_dir / "delete-vector.bin").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40,
50},
+ {"one", "two", "three", "four", "five"});
+ const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0, 4});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size)}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ EXPECT_EQ(read_iceberg_ids(&reader, projected_columns),
std::vector<int32_t>({2, 3, 4}));
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, IcebergTableReaderAppliesPositionDeleteFile) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_position_delete_file_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto delete_file_path = (test_dir /
"position-delete.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40,
50},
+ {"one", "two", "three", "four", "five"});
+ write_position_delete_parquet_file(delete_file_path, {file_path,
file_path}, {1, 3});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_position_delete_file(delete_file_path)}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ EXPECT_EQ(read_iceberg_ids(&reader, projected_columns),
std::vector<int32_t>({1, 3, 5}));
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
IcebergTableReaderMergesDeletionVectorAndPositionDeleteFiles) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_iceberg_delete_files_merge_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ const auto dv_path = (test_dir / "delete-vector.bin").string();
+ const auto position_delete_path = (test_dir /
"position-delete.parquet").string();
+ write_int_pair_parquet_file(file_path, {1, 2, 3, 4, 5}, {10, 20, 30, 40,
50},
+ {"one", "two", "three", "four", "five"});
+ const auto dv_size = write_iceberg_deletion_vector_file(dv_path, {0});
+ write_position_delete_parquet_file(position_delete_path, {file_path,
file_path}, {3, 3});
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeProfile profile("test_profile");
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ auto scan_params = make_local_parquet_scan_params();
+ io::FileReaderStats file_reader_stats;
+ io::FileCacheStatistics file_cache_stats;
+ auto io_ctx = make_io_context(&file_reader_stats, &file_cache_stats);
+ ShardedKVCache cache(1);
+ doris::iceberg::IcebergTableReader reader;
+ ASSERT_TRUE(reader.init({
+ .projected_columns = projected_columns,
+ .column_predicates = {},
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = &scan_params,
+ .io_ctx = io_ctx,
+ .runtime_state = &state,
+ .scanner_profile = &profile,
+ .allow_missing_columns = true,
+ .profile =
make_table_read_profile(&profile),
+ })
+ .ok());
+
+ auto split_options = build_split_options(file_path);
+ split_options.cache = &cache;
+
split_options.current_range.__set_table_format_params(make_iceberg_table_format_desc(
+ file_path, {make_iceberg_deletion_vector(dv_path, 0, dv_size),
+
make_iceberg_position_delete_file(position_delete_path)}));
+ ASSERT_TRUE(reader.prepare_split(split_options).ok());
+
+ EXPECT_EQ(read_iceberg_ids(&reader, projected_columns),
std::vector<int32_t>({2, 3, 5}));
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
RowPositionDeletePredicateColumnIsNotRepeatedAsOutputColumn) {
+ const auto row_position_column_id =
+ doris::parquet::ParquetColumnReaderFactory::ROW_POSITION_COLUMN_ID;
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(100, "_row_id",
make_nullable(std::make_shared<DataTypeInt64>())));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ IcebergTableReaderScanRequestTestHelper reader;
+ ASSERT_TRUE(reader.init_for_scan_request_test(projected_columns).ok());
+
+ FileScanRequest request;
+ request.non_predicate_columns.push_back(0);
+ request.column_positions.emplace(0, 0);
+
+ ASSERT_TRUE(reader.customize_request(&request).ok());
+
+ EXPECT_EQ(request.predicate_columns,
std::vector<ColumnId>({row_position_column_id}));
+ EXPECT_EQ(request.non_predicate_columns, std::vector<ColumnId>({0}));
+ ASSERT_TRUE(request.column_positions.contains(row_position_column_id));
+ EXPECT_EQ(request.column_positions.at(row_position_column_id), 1);
+ ASSERT_EQ(request.expression_filters.size(), 1);
+ EXPECT_NE(request.expression_filters[0].delete_conjunct, nullptr);
+}
+
TEST(TableReaderTest, ParquetReaderReadsOnlyRowGroupsInFileRange) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_file_range_test";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]