This is an automated email from the ASF dual-hosted git repository.
hubgeter pushed a commit to branch iceberg-v3-test-branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/iceberg-v3-test-branch-4.1 by
this push:
new 582d219d855 [fix](iceberg) Add missing Iceberg field IDs for position
delete files
582d219d855 is described below
commit 582d219d8555b736650b2dea9c34cc2d172c1a0e
Author: daidai <[email protected]>
AuthorDate: Thu May 21 17:38:08 2026 +0800
[fix](iceberg) Add missing Iceberg field IDs for position delete files
---
.../writer/iceberg/viceberg_delete_file_writer.cpp | 25 ++++++++++--
.../writer/iceberg/viceberg_delete_file_writer.h | 4 ++
be/test/exec/sink/viceberg_delete_sink_test.cpp | 47 ++++++++++++++++++++++
3 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
index 8d1724a4396..968ac987e9a 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.cpp
@@ -19,6 +19,8 @@
#include <fmt/format.h>
+#include "format/table/iceberg/schema.h"
+#include "format/table/iceberg/types.h"
#include "format/transformer/vorc_transformer.h"
#include "format/transformer/vparquet_transformer.h"
#include "io/file_factory.h"
@@ -26,6 +28,20 @@
namespace doris {
+// Iceberg reserved field IDs for position delete files.
+constexpr int POSITION_DELETE_FILE_PATH_ID = 2147483546;
+constexpr int POSITION_DELETE_POS_ID = 2147483545;
+
+std::unique_ptr<iceberg::Schema> build_position_delete_schema() {
+ std::vector<iceberg::NestedField> fields;
+ fields.reserve(2);
+ fields.emplace_back(false, POSITION_DELETE_FILE_PATH_ID, "file_path",
+ std::make_unique<iceberg::StringType>(), std::nullopt);
+ fields.emplace_back(false, POSITION_DELETE_POS_ID, "pos",
std::make_unique<iceberg::LongType>(),
+ std::nullopt);
+ return std::make_unique<iceberg::Schema>(std::move(fields));
+}
+
VIcebergDeleteFileWriter::VIcebergDeleteFileWriter(TFileContent::type
delete_type,
const std::string&
output_path,
TFileFormatType::type
file_format,
@@ -46,6 +62,7 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state,
RuntimeProfile* profi
if (_delete_type != TFileContent::POSITION_DELETES) {
return Status::NotSupported("Iceberg delete file writer only supports
position deletes");
}
+ _position_delete_schema = build_position_delete_schema();
_state = state;
@@ -83,15 +100,15 @@ Status VIcebergDeleteFileWriter::open(RuntimeState* state,
RuntimeProfile* profi
ParquetFileOptions parquet_options = {parquet_compression_type,
TParquetVersion::PARQUET_1_0,
false, false};
- _file_format_transformer.reset(new VParquetTransformer(state,
_file_writer.get(),
- output_exprs,
column_names, false,
-
parquet_options, nullptr, nullptr));
+ _file_format_transformer.reset(new VParquetTransformer(
+ state, _file_writer.get(), output_exprs, column_names, false,
parquet_options,
+ nullptr, _position_delete_schema.get()));
return _file_format_transformer->open();
}
case TFileFormatType::FORMAT_ORC: {
_file_format_transformer.reset(new VOrcTransformer(state,
_file_writer.get(), output_exprs,
"", column_names,
false, _compress_type,
- nullptr));
+
_position_delete_schema.get()));
return _file_format_transformer->open();
}
default:
diff --git a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
index e5de7143f2b..c242731dc15 100644
--- a/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
+++ b/be/src/exec/sink/writer/iceberg/viceberg_delete_file_writer.h
@@ -32,6 +32,9 @@ namespace doris {
class RuntimeState;
class RuntimeProfile;
class ObjectPool;
+namespace iceberg {
+class Schema;
+}
namespace io {
class FileSystem;
@@ -103,6 +106,7 @@ private:
RuntimeState* _state = nullptr;
std::shared_ptr<io::FileSystem> _fs;
io::FileWriterPtr _file_writer;
+ std::unique_ptr<iceberg::Schema> _position_delete_schema;
std::unique_ptr<VFileFormatTransformer> _file_format_transformer;
int32_t _partition_spec_id = 0;
diff --git a/be/test/exec/sink/viceberg_delete_sink_test.cpp
b/be/test/exec/sink/viceberg_delete_sink_test.cpp
index d9fc5086503..5af028b6b70 100644
--- a/be/test/exec/sink/viceberg_delete_sink_test.cpp
+++ b/be/test/exec/sink/viceberg_delete_sink_test.cpp
@@ -18,6 +18,8 @@
#include "exec/sink/viceberg_delete_sink.h"
#include <gtest/gtest.h>
+#include <parquet/api/reader.h>
+#include <parquet/schema.h>
#include <rapidjson/document.h>
#include <filesystem>
@@ -35,7 +37,9 @@
#include "exec/common/endian.h"
#include "gen_cpp/DataSinks_types.h"
#include "gen_cpp/Types_types.h"
+#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
+#include "testutil/mock/mock_runtime_state.h"
#include "util/uid_util.h"
namespace doris {
@@ -480,6 +484,49 @@ TEST_F(VIcebergDeleteSinkTest, TestGenerateDeleteFilePath)
{
ASSERT_NE(std::string::npos, delete_file_path.find("delete_pos_"));
}
+TEST_F(VIcebergDeleteSinkTest, TestWritePositionDeleteParquetFieldIds) {
+ std::filesystem::path temp_dir = std::filesystem::temp_directory_path() /
+ ("iceberg_position_delete_test_" +
generate_uuid_string());
+ ASSERT_TRUE(std::filesystem::create_directories(temp_dir));
+
+ TDataSink t_data_sink = build_local_delete_sink(temp_dir.string(), 2);
+ VExprContextSPtrs output_exprs;
+ auto sink = std::make_shared<VIcebergDeleteSink>(t_data_sink,
output_exprs, nullptr, nullptr);
+ ObjectPool pool;
+ ASSERT_TRUE(sink->init_properties(&pool).ok());
+
+ MockRuntimeState state;
+ RuntimeProfile profile("iceberg_delete_sink");
+ ASSERT_TRUE(sink->open(&state, &profile).ok());
+
+ std::map<std::string, IcebergFileDeletion> file_deletions;
+ auto [file_it, inserted] =
+ file_deletions.emplace("file1.parquet", IcebergFileDeletion(1,
"[\"p=1\"]"));
+ ASSERT_TRUE(inserted);
+ file_it->second.rows_to_delete.add((uint32_t)10);
+ file_it->second.rows_to_delete.add((uint32_t)20);
+
+ ASSERT_TRUE(sink->_write_position_delete_files(file_deletions).ok());
+ ASSERT_EQ(1, sink->_commit_data_list.size());
+
+ const auto& commit_data = sink->_commit_data_list[0];
+ std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+ parquet::ParquetFileReader::OpenFile(commit_data.file_path, false);
+ std::shared_ptr<parquet::FileMetaData> file_metadata =
parquet_reader->metadata();
+ const auto& group_node =
+ static_cast<const
parquet::schema::GroupNode&>(*file_metadata->schema()->group_node());
+
+ ASSERT_EQ(2, group_node.field_count());
+ auto file_path_field = group_node.field(0);
+ auto pos_field = group_node.field(1);
+ EXPECT_EQ("file_path", file_path_field->name());
+ EXPECT_EQ(2147483546, file_path_field->field_id());
+ EXPECT_EQ("pos", pos_field->name());
+ EXPECT_EQ(2147483545, pos_field->field_id());
+
+ ASSERT_TRUE(std::filesystem::remove_all(temp_dir) > 0);
+}
+
TEST_F(VIcebergDeleteSinkTest, TestUnsupportedDeleteType) {
// Create a TDataSink for an unsupported delete type
TDataSink t_eq_delete_sink;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]