This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new bfae4d0b3 IMPALA-14496: Impala crashes when it writes multiple delete
files per partition in a single DELETE operation
bfae4d0b3 is described below
commit bfae4d0b32dfb035495831f003f30b2d336d1ec6
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Oct 14 10:11:15 2025 +0200
IMPALA-14496: Impala crashes when it writes multiple delete files per
partition in a single DELETE operation
Impala crashes when it needs to write multiple delete files per
partition in a single DELETE operation. It is because
IcebergBufferedDeleteSink has its own DmlExecState object, but
sometimes the methods in TableSinkBase use the RuntimeState's
DmlExecState object. I.e. it can happen that we add a partition
to the IcebergBufferedDeleteSink's DmlExecState, but later we
expect to find it in the RuntimeState's DmlExecState.
This patch adds new methods to TableSinkBase that are specific
for writing delete files, and they always take a DmlExecState
object as a parameter. They are now used by IcebergBufferedDeleteSink.
Testing
* added e2e tests
Change-Id: I46266007a6356e9ff3b63369dd855aff1396bb72
Reviewed-on: http://gerrit.cloudera.org:8080/23537
Reviewed-by: Mihaly Szjatinya <[email protected]>
Reviewed-by: Michael Smith <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
be/src/exec/iceberg-buffered-delete-sink.cc | 7 ++-
be/src/exec/table-sink-base.cc | 68 +++++++++++++++++-----
be/src/exec/table-sink-base.h | 26 ++++++++-
.../iceberg-multiple-delete-per-partition.test | 49 ++++++++++++++++
tests/query_test/test_iceberg.py | 5 ++
5 files changed, 135 insertions(+), 20 deletions(-)
diff --git a/be/src/exec/iceberg-buffered-delete-sink.cc
b/be/src/exec/iceberg-buffered-delete-sink.cc
index 6b34bb319..d38cb6fb4 100644
--- a/be/src/exec/iceberg-buffered-delete-sink.cc
+++ b/be/src/exec/iceberg-buffered-delete-sink.cc
@@ -270,11 +270,12 @@ Status
IcebergBufferedDeleteSink::FlushBufferedRecords(RuntimeState* state) {
row_batch.Reset();
RETURN_IF_ERROR(GetNextRowBatch(&row_batch, &it));
row_batch.VLogRows("IcebergBufferedDeleteSink");
- RETURN_IF_ERROR(WriteRowsToPartition(state, &row_batch,
current_partition_.get()));
+ RETURN_IF_ERROR(WriteDeleteRowsToPartition(state, &row_batch,
+ current_partition_.get(), &dml_exec_state_));
}
DCHECK(current_partition_ != nullptr);
- RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.get(),
- /*is_delete=*/true, &dml_exec_state_));
+ RETURN_IF_ERROR(FinalizeDeletePartitionFile(state,
current_partition_.get(),
+ &dml_exec_state_));
current_partition_->writer->Close();
}
return Status::OK();
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
index 598c50492..2b8334d1c 100644
--- a/be/src/exec/table-sink-base.cc
+++ b/be/src/exec/table-sink-base.cc
@@ -375,18 +375,7 @@ Status TableSinkBase::WriteRowsToPartition(
// set.
bool new_file;
while (true) {
- Status status =
- output_partition->writer->AppendRows(batch, indices, &new_file);
- if (!status.ok()) {
- // IMPALA-10607: Deletes partition file if staging is skipped when
appending rows
- // fails. Otherwise, it leaves the file in un-finalized state.
- if (ShouldSkipStaging(state, output_partition)) {
- status.MergeStatus(ClosePartitionFile(state, output_partition));
- hdfsDelete(output_partition->hdfs_connection,
- output_partition->current_file_name.c_str(), 0);
- }
- return status;
- }
+ RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, indices,
&new_file));
if (!new_file) break;
RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
@@ -394,6 +383,41 @@ Status TableSinkBase::WriteRowsToPartition(
return Status::OK();
}
+Status TableSinkBase::WriteDeleteRowsToPartition(
+ RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
+ DmlExecState* dml_exec_state) {
+ // The rows of this batch may span multiple files. We repeatedly pass the
row batch to
+ // the writer until it sets new_file to false, indicating that all rows have
been
+ // written. The writer tracks where it is in the batch when it returns with
new_file
+ // set.
+ bool new_file;
+ while (true) {
+ RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, {},
&new_file));
+ if (!new_file) break;
+ RETURN_IF_ERROR(FinalizeDeletePartitionFile(state, output_partition,
dml_exec_state));
+ RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
+ }
+ return Status::OK();
+}
+
+Status TableSinkBase::WriteRowsToFile(
+ RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
+ const std::vector<int32_t>& indices, bool *new_file) {
+ Status status =
+ output_partition->writer->AppendRows(batch, indices, new_file);
+ if (!status.ok()) {
+ // IMPALA-10607: Deletes partition file if staging is skipped when
appending rows
+ // fails. Otherwise, it leaves the file in un-finalized state.
+ if (ShouldSkipStaging(state, output_partition)) {
+ status.MergeStatus(ClosePartitionFile(state, output_partition));
+ hdfsDelete(output_partition->hdfs_connection,
+ output_partition->current_file_name.c_str(), 0);
+ }
+ return status;
+ }
+ return Status::OK();
+}
+
void TableSinkBase::GetHashTblKey(const TupleRow* row,
const vector<ScalarExprEvaluator*>& evals, string* key) {
stringstream hash_table_key;
@@ -414,9 +438,23 @@ bool TableSinkBase::ShouldSkipStaging(RuntimeState* state,
OutputPartition* part
}
Status TableSinkBase::FinalizePartitionFile(
- RuntimeState* state, OutputPartition* partition, bool is_delete,
- DmlExecState* dml_exec_state) {
- if (dml_exec_state == nullptr) dml_exec_state = state->dml_exec_state();
+ RuntimeState* state, OutputPartition* partition) {
+ return FinalizePartitionFileImpl(
+ state, partition, /*is_delete=*/false, state->dml_exec_state());
+}
+
+Status TableSinkBase::FinalizeDeletePartitionFile(
+ RuntimeState* state, OutputPartition* partition, DmlExecState*
dml_exec_state) {
+ DCHECK(IsIceberg());
+ DCHECK(dml_exec_state != nullptr);
+ DCHECK_NE(dml_exec_state, state->dml_exec_state());
+
+ return FinalizePartitionFileImpl(state, partition, /*is_delete=*/true,
dml_exec_state);
+}
+
+Status TableSinkBase::FinalizePartitionFileImpl(RuntimeState* state,
+ OutputPartition* partition, bool is_delete, DmlExecState* dml_exec_state) {
+ DCHECK(dml_exec_state != nullptr);
if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return
Status::OK();
SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h
index 4434599e8..78a56d561 100644
--- a/be/src/exec/table-sink-base.h
+++ b/be/src/exec/table-sink-base.h
@@ -126,8 +126,13 @@ protected:
/// Updates runtime stats of HDFS with rows written, then closes the file
associated
/// with the partition by calling ClosePartitionFile()
- Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
- bool is_delete = false, DmlExecState* dml_exec_state = nullptr)
WARN_UNUSED_RESULT;
+ Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
+ WARN_UNUSED_RESULT;
+
+ /// Same as above, but for delete files, as such table sinks have their own
+ /// DmlExecState.
+ Status FinalizeDeletePartitionFile(RuntimeState* state, OutputPartition*
partition,
+ DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
/// Writes all rows in 'batch' referenced by the row index vector in
'indices' to the
/// partition's writer. If 'indices' is empty, then it writes all rows in
'batch'.
@@ -136,6 +141,13 @@ protected:
const std::vector<int32_t>& indices = {})
WARN_UNUSED_RESULT;
+ /// Writes all rows to the partition's writer. It is only used for writing
delete files
+ /// as such table sinks have their own DmlExecState.
+ Status WriteDeleteRowsToPartition(
+ RuntimeState* state, RowBatch* batch, OutputPartition* partition,
+ DmlExecState* dml_exec_state)
+ WARN_UNUSED_RESULT;
+
/// Closes the hdfs file for this partition as well as the writer.
Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
WARN_UNUSED_RESULT;
@@ -204,6 +216,16 @@ protected:
RuntimeProfile::Counter* hdfs_write_timer_;
/// Time spent compressing data
RuntimeProfile::Counter* compress_timer_;
+
+private:
+ /// Writes rows to the partition's writer. Sets 'new_file' to true when it
cannot write
+ /// all rows to the current output file.
+ Status WriteRowsToFile(
+ RuntimeState* state, RowBatch* batch, OutputPartition* partition,
+ const std::vector<int32_t>& indices, bool *new_file) WARN_UNUSED_RESULT;
+
+ Status FinalizePartitionFileImpl(RuntimeState* state, OutputPartition*
partition,
+ bool is_delete, DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
};
}
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
new file mode 100644
index 000000000..bcb3ee5de
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
@@ -0,0 +1,49 @@
+====
+---- QUERY
+# Regression test for IMPALA-14496 where a DELETE operation needs to write
+# multiple delete files per partition.
+CREATE TABLE multiple_deletes(
+ str STRING NULL,
+ year INT NULL,
+ last_modified TIMESTAMP NULL)
+PARTITIONED BY SPEC (year)
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO multiple_deletes SELECT string_col, year, timestamp_col FROM
functional_parquet.alltypes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+
+SELECT count(*) FROM multiple_deletes;
+---- RESULTS
+7475200
+---- TYPES
+BIGINT
+====
+---- QUERY
+SET parquet_file_size=8m;
+DELETE FROM multiple_deletes WHERE last_modified >= '2008-12-30';
+
+SELECT count(*) FROM multiple_deletes;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Verify that the above DELETE statement wrote 4 delete files (2 per
partition) in total.
+SELECT count(*) FROM $DATABASE.multiple_deletes.`files`
+WHERE content = 1;
+---- RESULTS
+4
+---- TYPES
+BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 2140e3fbf..527300261 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2188,6 +2188,11 @@ class TestIcebergV2Table(IcebergTestSuite):
vector.unset_exec_option('num_nodes')
self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector,
unique_database)
+ def test_writing_multiple_deletes_per_partition(self, vector,
unique_database):
+ """Test writing multiple delete files partition in a single DELETE
operation."""
+ self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition',
vector,
+ use_db=unique_database)
+
def test_cleanup(self, unique_database):
"""Test that all uncommitted files written by Impala are removed from
the file
system when a DML commit to an Iceberg table fails, and that the effects
of the