This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new bfae4d0b3 IMPALA-14496: Impala crashes when it writes multiple delete 
files per partition in a single DELETE operation
bfae4d0b3 is described below

commit bfae4d0b32dfb035495831f003f30b2d336d1ec6
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Tue Oct 14 10:11:15 2025 +0200

    IMPALA-14496: Impala crashes when it writes multiple delete files per 
partition in a single DELETE operation
    
    Impala crashes when it needs to write multiple delete files per
    partition in a single DELETE operation. It is because
    IcebergBufferedDeleteSink has its own DmlExecState object, but
    sometimes the methods in TableSinkBase use the RuntimeState's
    DmlExecState object. I.e. it can happen that we add a partition
    to the IcebergBufferedDeleteSink's DmlExecState, but later we
    expect to find it in the RuntimeState's DmlExecState.
    
    This patch adds new methods to TableSinkBase that are specific
    for writing delete files, and they always take a DmlExecState
    object as a parameter. They are now used by IcebergBufferedDeleteSink.
    
    Testing
     * added e2e tests
    
    Change-Id: I46266007a6356e9ff3b63369dd855aff1396bb72
    Reviewed-on: http://gerrit.cloudera.org:8080/23537
    Reviewed-by: Mihaly Szjatinya <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/iceberg-buffered-delete-sink.cc        |  7 ++-
 be/src/exec/table-sink-base.cc                     | 68 +++++++++++++++++-----
 be/src/exec/table-sink-base.h                      | 26 ++++++++-
 .../iceberg-multiple-delete-per-partition.test     | 49 ++++++++++++++++
 tests/query_test/test_iceberg.py                   |  5 ++
 5 files changed, 135 insertions(+), 20 deletions(-)

diff --git a/be/src/exec/iceberg-buffered-delete-sink.cc 
b/be/src/exec/iceberg-buffered-delete-sink.cc
index 6b34bb319..d38cb6fb4 100644
--- a/be/src/exec/iceberg-buffered-delete-sink.cc
+++ b/be/src/exec/iceberg-buffered-delete-sink.cc
@@ -270,11 +270,12 @@ Status 
IcebergBufferedDeleteSink::FlushBufferedRecords(RuntimeState* state) {
       row_batch.Reset();
       RETURN_IF_ERROR(GetNextRowBatch(&row_batch, &it));
       row_batch.VLogRows("IcebergBufferedDeleteSink");
-      RETURN_IF_ERROR(WriteRowsToPartition(state, &row_batch, 
current_partition_.get()));
+      RETURN_IF_ERROR(WriteDeleteRowsToPartition(state, &row_batch,
+          current_partition_.get(), &dml_exec_state_));
     }
     DCHECK(current_partition_ != nullptr);
-    RETURN_IF_ERROR(FinalizePartitionFile(state, current_partition_.get(),
-        /*is_delete=*/true, &dml_exec_state_));
+    RETURN_IF_ERROR(FinalizeDeletePartitionFile(state, 
current_partition_.get(),
+        &dml_exec_state_));
     current_partition_->writer->Close();
   }
   return Status::OK();
diff --git a/be/src/exec/table-sink-base.cc b/be/src/exec/table-sink-base.cc
index 598c50492..2b8334d1c 100644
--- a/be/src/exec/table-sink-base.cc
+++ b/be/src/exec/table-sink-base.cc
@@ -375,18 +375,7 @@ Status TableSinkBase::WriteRowsToPartition(
   // set.
   bool new_file;
   while (true) {
-    Status status =
-        output_partition->writer->AppendRows(batch, indices, &new_file);
-    if (!status.ok()) {
-      // IMPALA-10607: Deletes partition file if staging is skipped when 
appending rows
-      // fails. Otherwise, it leaves the file in un-finalized state.
-      if (ShouldSkipStaging(state, output_partition)) {
-        status.MergeStatus(ClosePartitionFile(state, output_partition));
-        hdfsDelete(output_partition->hdfs_connection,
-            output_partition->current_file_name.c_str(), 0);
-      }
-      return status;
-    }
+    RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, indices, 
&new_file));
     if (!new_file) break;
     RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition));
     RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
@@ -394,6 +383,41 @@ Status TableSinkBase::WriteRowsToPartition(
   return Status::OK();
 }
 
+Status TableSinkBase::WriteDeleteRowsToPartition(
+    RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
+    DmlExecState* dml_exec_state) {
+  // The rows of this batch may span multiple files. We repeatedly pass the 
row batch to
+  // the writer until it sets new_file to false, indicating that all rows have 
been
+  // written. The writer tracks where it is in the batch when it returns with 
new_file
+  // set.
+  bool new_file;
+  while (true) {
+    RETURN_IF_ERROR(WriteRowsToFile(state, batch, output_partition, {}, 
&new_file));
+    if (!new_file) break;
+    RETURN_IF_ERROR(FinalizeDeletePartitionFile(state, output_partition, 
dml_exec_state));
+    RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition));
+  }
+  return Status::OK();
+}
+
+Status TableSinkBase::WriteRowsToFile(
+    RuntimeState* state, RowBatch* batch, OutputPartition* output_partition,
+    const std::vector<int32_t>& indices, bool *new_file) {
+  Status status =
+      output_partition->writer->AppendRows(batch, indices, new_file);
+  if (!status.ok()) {
+    // IMPALA-10607: Deletes partition file if staging is skipped when 
appending rows
+    // fails. Otherwise, it leaves the file in un-finalized state.
+    if (ShouldSkipStaging(state, output_partition)) {
+      status.MergeStatus(ClosePartitionFile(state, output_partition));
+      hdfsDelete(output_partition->hdfs_connection,
+          output_partition->current_file_name.c_str(), 0);
+    }
+    return status;
+  }
+  return Status::OK();
+}
+
 void TableSinkBase::GetHashTblKey(const TupleRow* row,
     const vector<ScalarExprEvaluator*>& evals, string* key) {
   stringstream hash_table_key;
@@ -414,9 +438,23 @@ bool TableSinkBase::ShouldSkipStaging(RuntimeState* state, 
OutputPartition* part
 }
 
 Status TableSinkBase::FinalizePartitionFile(
-    RuntimeState* state, OutputPartition* partition, bool is_delete,
-    DmlExecState* dml_exec_state) {
-  if (dml_exec_state == nullptr) dml_exec_state = state->dml_exec_state();
+    RuntimeState* state, OutputPartition* partition) {
+  return FinalizePartitionFileImpl(
+      state, partition, /*is_delete=*/false, state->dml_exec_state());
+}
+
+Status TableSinkBase::FinalizeDeletePartitionFile(
+    RuntimeState* state, OutputPartition* partition, DmlExecState* 
dml_exec_state) {
+  DCHECK(IsIceberg());
+  DCHECK(dml_exec_state != nullptr);
+  DCHECK_NE(dml_exec_state, state->dml_exec_state());
+
+  return FinalizePartitionFileImpl(state, partition, /*is_delete=*/true, 
dml_exec_state);
+}
+
+Status TableSinkBase::FinalizePartitionFileImpl(RuntimeState* state,
+    OutputPartition* partition, bool is_delete, DmlExecState* dml_exec_state) {
+  DCHECK(dml_exec_state != nullptr);
   if (partition->tmp_hdfs_file == nullptr && !is_overwrite()) return 
Status::OK();
   SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer"));
 
diff --git a/be/src/exec/table-sink-base.h b/be/src/exec/table-sink-base.h
index 4434599e8..78a56d561 100644
--- a/be/src/exec/table-sink-base.h
+++ b/be/src/exec/table-sink-base.h
@@ -126,8 +126,13 @@ protected:
 
   /// Updates runtime stats of HDFS with rows written, then closes the file 
associated
   /// with the partition by calling ClosePartitionFile()
-  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition,
-      bool is_delete = false, DmlExecState* dml_exec_state = nullptr) 
WARN_UNUSED_RESULT;
+  Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition)
+      WARN_UNUSED_RESULT;
+
+  /// Same as above, but for delete files, as such table sinks have their own
+  /// DmlExecState.
+  Status FinalizeDeletePartitionFile(RuntimeState* state, OutputPartition* 
partition,
+      DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
 
   /// Writes all rows in 'batch' referenced by the row index vector in 
'indices' to the
   /// partition's writer. If 'indices' is empty, then it writes all rows in 
'batch'.
@@ -136,6 +141,13 @@ protected:
       const std::vector<int32_t>& indices = {})
       WARN_UNUSED_RESULT;
 
+  /// Writes all rows to the partition's writer. It is only used for writing 
delete files
+  /// as such table sinks have their own DmlExecState.
+  Status WriteDeleteRowsToPartition(
+      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
+      DmlExecState* dml_exec_state)
+      WARN_UNUSED_RESULT;
+
   /// Closes the hdfs file for this partition as well as the writer.
   Status ClosePartitionFile(RuntimeState* state, OutputPartition* partition)
       WARN_UNUSED_RESULT;
@@ -204,6 +216,16 @@ protected:
   RuntimeProfile::Counter* hdfs_write_timer_;
   /// Time spent compressing data
   RuntimeProfile::Counter* compress_timer_;
+
+private:
+  /// Writes rows to the partition's writer. Sets 'new_file' to true when it 
cannot write
+  /// all rows to the current output file.
+  Status WriteRowsToFile(
+      RuntimeState* state, RowBatch* batch, OutputPartition* partition,
+      const std::vector<int32_t>& indices, bool *new_file) WARN_UNUSED_RESULT;
+
+  Status FinalizePartitionFileImpl(RuntimeState* state, OutputPartition* 
partition,
+      bool is_delete, DmlExecState* dml_exec_state) WARN_UNUSED_RESULT;
 };
 
 }
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
new file mode 100644
index 000000000..bcb3ee5de
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-multiple-delete-per-partition.test
@@ -0,0 +1,49 @@
+====
+---- QUERY
+# Regression test for IMPALA-14496 where a DELETE operation needs to write
+# multiple delete files per partition.
+CREATE TABLE multiple_deletes(
+  str STRING NULL,
+  year INT NULL,
+  last_modified TIMESTAMP NULL)
+PARTITIONED BY SPEC (year)
+STORED AS ICEBERG
+TBLPROPERTIES ('format-version'='2');
+
+INSERT INTO multiple_deletes SELECT string_col, year, timestamp_col FROM 
functional_parquet.alltypes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+INSERT INTO multiple_deletes SELECT * FROM multiple_deletes;
+
+SELECT count(*) FROM multiple_deletes;
+---- RESULTS
+7475200
+---- TYPES
+BIGINT
+====
+---- QUERY
+SET parquet_file_size=8m;
+DELETE FROM multiple_deletes WHERE last_modified >= '2008-12-30';
+
+SELECT count(*) FROM multiple_deletes;
+---- RESULTS
+0
+---- TYPES
+BIGINT
+====
+---- QUERY
+# Verify that the above DELETE statement wrote 4 delete files (2 per 
partition) in total.
+SELECT count(*) FROM $DATABASE.multiple_deletes.`files`
+WHERE content = 1;
+---- RESULTS
+4
+---- TYPES
+BIGINT
+====
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 2140e3fbf..527300261 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -2188,6 +2188,11 @@ class TestIcebergV2Table(IcebergTestSuite):
     vector.unset_exec_option('num_nodes')
     self.run_test_case('QueryTest/iceberg-merge-duplicate-check', vector, 
unique_database)
 
+  def test_writing_multiple_deletes_per_partition(self, vector, 
unique_database):
+    """Test writing multiple delete files partition in a single DELETE 
operation."""
+    self.run_test_case('QueryTest/iceberg-multiple-delete-per-partition', 
vector,
+        use_db=unique_database)
+
   def test_cleanup(self, unique_database):
       """Test that all uncommitted files written by Impala are removed from 
the file
       system when a DML commit to an Iceberg table fails, and that the effects 
of the

Reply via email to