This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4428db37b3884373482071fc918936b0c080e47c
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Mar 1 20:49:11 2024 +0100

    IMPALA-12860: Invoke validateDataFilesExist for RowDelta operations
    
    We must invoke validateDataFilesExist for RowDelta operations (DELETE/
    UPDATE/MERGE). Without this a concurrent RewriteFiles (compaction) and
    RowDelta can corrupt a table.
    
    IcebergBufferedDeleteSink now also collects the filenames of the data
    files that are referenced in the position delete files. It adds them to
    the DML exec state which is then collected by the Coordinator. The
    Coordinator passes the file paths to CatalogD which executes Iceberg's
    RowDelta operation and now invokes validateDataFilesExist() with the
    file paths. Additionally it also invokes validateDeletedFiles().
    
    This patch set also resolves IMPALA-12640 which is about replacing
    IcebergDeleteSink with IcebergBufferedDeleteSink, as from now on
    we use the buffered version for all DML operations that write
    position delete files.
    
    Testing:
     * adds new stress test with DELETE + UPDATE + OPTIMIZE
    
    Change-Id: I4869eb863ff0afe8f691ccf2fd681a92d36b405c
    Reviewed-on: http://gerrit.cloudera.org:8080/21099
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Gabor Kaszab <[email protected]>
---
 be/src/exec/CMakeLists.txt                         |   1 -
 be/src/exec/iceberg-buffered-delete-sink.cc        |  18 ++
 be/src/exec/iceberg-buffered-delete-sink.h         |   3 +
 be/src/exec/iceberg-delete-sink-config.cc          |  12 +-
 be/src/exec/iceberg-delete-sink.cc                 | 245 ---------------------
 be/src/exec/iceberg-delete-sink.h                  |  93 --------
 be/src/exec/multi-table-sink.cc                    |   3 +-
 be/src/runtime/dml-exec-state.cc                   |   7 +
 be/src/runtime/dml-exec-state.h                    |  21 ++
 be/src/service/client-request-state.cc             |   4 +
 common/protobuf/control_service.proto              |   4 +
 common/thrift/CatalogService.thrift                |   3 +
 common/thrift/DataSinks.thrift                     |   4 +-
 .../apache/impala/analysis/IcebergDeleteImpl.java  |   5 +-
 .../impala/planner/IcebergBufferedDeleteSink.java  |   1 -
 .../apache/impala/planner/IcebergDeleteSink.java   | 145 ------------
 .../java/org/apache/impala/planner/TableSink.java  |   5 +-
 .../impala/service/IcebergCatalogOpExecutor.java   |   8 +-
 .../queries/PlannerTest/iceberg-v2-delete.test     |  88 +++-----
 tests/stress/test_update_stress.py                 |  73 ++++--
 20 files changed, 163 insertions(+), 580 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index e6561045e..a9c5c61de 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -81,7 +81,6 @@ add_library(Exec
   iceberg-buffered-delete-sink.cc
   iceberg-delete-builder.cc
   iceberg-delete-node.cc
-  iceberg-delete-sink.cc
   iceberg-delete-sink-base.cc
   iceberg-delete-sink-config.cc
   incr-stats-util.cc
diff --git a/be/src/exec/iceberg-buffered-delete-sink.cc 
b/be/src/exec/iceberg-buffered-delete-sink.cc
index 4743d4065..b7fdfd08c 100644
--- a/be/src/exec/iceberg-buffered-delete-sink.cc
+++ b/be/src/exec/iceberg-buffered-delete-sink.cc
@@ -385,9 +385,27 @@ Status IcebergBufferedDeleteSink::FlushFinal(RuntimeState* 
state) {
   VLogBufferedRecords();
   RETURN_IF_ERROR(VerifyBufferedRecords());
   RETURN_IF_ERROR(FlushBufferedRecords(state));
+  RegisterDataFilesInDmlExecState();
   return Status::OK();
 }
 
+void IcebergBufferedDeleteSink::RegisterDataFilesInDmlExecState() {
+  int capacity = 0;
+  for (const auto& entry : partitions_to_file_positions_) {
+    const FilePositions& file_positions = entry.second;
+    capacity += file_positions.size();
+  }
+  dml_exec_state_.reserveReferencedDataFiles(capacity);
+  for (const auto& entry : partitions_to_file_positions_) {
+    const FilePositions& file_positions = entry.second;
+    for (const auto& file_pos_entry : file_positions) {
+      const StringValue& sv = file_pos_entry.first;
+      string filepath(sv.Ptr(), sv.Len());
+      dml_exec_state_.addReferencedDataFile(std::move(filepath));
+    }
+  }
+}
+
 void IcebergBufferedDeleteSink::Close(RuntimeState* state) {
   if (closed_) return;
   SCOPED_TIMER(profile()->total_time_counter());
diff --git a/be/src/exec/iceberg-buffered-delete-sink.h 
b/be/src/exec/iceberg-buffered-delete-sink.h
index fd51bf6c2..bdf3920ae 100644
--- a/be/src/exec/iceberg-buffered-delete-sink.h
+++ b/be/src/exec/iceberg-buffered-delete-sink.h
@@ -90,6 +90,9 @@ class IcebergBufferedDeleteSink : public 
IcebergDeleteSinkBase {
   /// Writes all buffered delete records to position delete files.
   Status FlushBufferedRecords(RuntimeState* state);
 
+  /// Registers the referenced data files in dml_exec_state_
+  void RegisterDataFilesInDmlExecState();
+
   /// Initializes an empty output batch.
   Status InitializeOutputRowBatch(RowBatch* batch);
 
diff --git a/be/src/exec/iceberg-delete-sink-config.cc 
b/be/src/exec/iceberg-delete-sink-config.cc
index 0ab358915..6cf93e833 100644
--- a/be/src/exec/iceberg-delete-sink-config.cc
+++ b/be/src/exec/iceberg-delete-sink-config.cc
@@ -20,7 +20,6 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "exec/iceberg-buffered-delete-sink.h"
-#include "exec/iceberg-delete-sink.h"
 #include "exprs/scalar-expr.h"
 #include "runtime/mem-pool.h"
 
@@ -28,13 +27,8 @@ namespace impala {
 
 DataSink* IcebergDeleteSinkConfig::CreateSink(RuntimeState* state) const {
   TDataSinkId sink_id = state->fragment().idx;
-  if (this->tsink_->table_sink.iceberg_delete_sink.is_buffered) {
-    return state->obj_pool()->Add(
-      new IcebergBufferedDeleteSink(sink_id, *this, state));
-  } else {
-    return state->obj_pool()->Add(
-      new IcebergDeleteSink(sink_id, *this, state));
-  }
+  return state->obj_pool()->Add(
+    new IcebergBufferedDeleteSink(sink_id, *this, state));
 }
 
 Status IcebergDeleteSinkConfig::Init(
@@ -48,4 +42,4 @@ Status IcebergDeleteSinkConfig::Init(
   return Status::OK();
 }
 
-}
\ No newline at end of file
+}
diff --git a/be/src/exec/iceberg-delete-sink.cc 
b/be/src/exec/iceberg-delete-sink.cc
deleted file mode 100644
index 657d75869..000000000
--- a/be/src/exec/iceberg-delete-sink.cc
+++ /dev/null
@@ -1,245 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/iceberg-delete-sink.h"
-
-#include "common/object-pool.h"
-#include "exec/iceberg-delete-sink-config.h"
-#include "exec/parquet/hdfs-parquet-table-writer.h"
-#include "exprs/scalar-expr.h"
-#include "exprs/scalar-expr-evaluator.h"
-#include "runtime/descriptors.h"
-#include "runtime/hdfs-fs-cache.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "util/coding-util.h"
-#include "util/hdfs-util.h"
-#include "util/impalad-metrics.h"
-#include "util/metrics.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-IcebergDeleteSink::IcebergDeleteSink(TDataSinkId sink_id,
-    const IcebergDeleteSinkConfig& sink_config, RuntimeState* state) :
-    IcebergDeleteSinkBase(sink_id, sink_config, "IcebergDeleteSink", state) {
-}
-
-Status IcebergDeleteSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
-  SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(IcebergDeleteSinkBase::Prepare(state, parent_mem_tracker));
-  return Status::OK();
-}
-
-Status IcebergDeleteSink::Open(RuntimeState* state) {
-  SCOPED_TIMER(profile()->total_time_counter());
-  RETURN_IF_ERROR(IcebergDeleteSinkBase::Open(state));
-  return Status::OK();
-}
-
-Status IcebergDeleteSink::Send(RuntimeState* state, RowBatch* batch) {
-  SCOPED_TIMER(profile()->total_time_counter());
-  expr_results_pool_->Clear();
-  RETURN_IF_ERROR(state->CheckQueryState());
-  // We don't do any work for an empty batch.
-  if (batch->num_rows() == 0) return Status::OK();
-
-  RETURN_IF_ERROR(VerifyRowsNotDuplicated(batch));
-
-  // If there are no partition keys then just pass the whole batch to one 
partition.
-  if (dynamic_partition_key_expr_evals_.empty()) {
-    if (current_partition_.first == nullptr) {
-      RETURN_IF_ERROR(SetCurrentPartition(state, nullptr, ROOT_PARTITION_KEY));
-    }
-    DCHECK(current_partition_.second.empty());
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_partition_.first.get()));
-  } else {
-    RETURN_IF_ERROR(WriteClusteredRowBatch(state, batch));
-  }
-  return Status::OK();
-}
-
-Status IcebergDeleteSink::VerifyRowsNotDuplicated(RowBatch* batch) {
-  DCHECK_EQ(output_exprs_.size(), 2);
-  DCHECK_EQ(output_expr_evals_.size(), 2);
-
-  ScalarExpr* filepath_expr = output_exprs_[0];
-  ScalarExpr* position_expr = output_exprs_[1];
-  DCHECK(filepath_expr->type().IsStringType());
-  DCHECK(position_expr->type().IsIntegerType());
-
-  ScalarExprEvaluator* filepath_eval = output_expr_evals_[0];
-  ScalarExprEvaluator* position_eval = output_expr_evals_[1];
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    TupleRow* row = batch->GetRow(i);
-    StringVal filepath_sv = filepath_eval->GetStringVal(row);
-    DCHECK(!filepath_sv.is_null);
-    BigIntVal position_bi = position_eval->GetBigIntVal(row);
-    DCHECK(!position_bi.is_null);
-    string filepath(reinterpret_cast<char*>(filepath_sv.ptr), filepath_sv.len);
-    int64_t position = position_bi.val;
-    if (prev_file_path_ == filepath && prev_position_ == position) {
-      return Status(Substitute("Duplicated row in DELETE sink. file_path='$0', 
pos='$1'. "
-          "If this is coming from an UPDATE statement with a JOIN, please 
check if there "
-          "multiple matches in the JOIN condition.", filepath, position));
-    }
-    prev_file_path_ = filepath;
-    prev_position_ = position;
-  }
-  return Status::OK();
-}
-
-inline Status IcebergDeleteSink::SetCurrentPartition(RuntimeState* state,
-    const TupleRow* row, const string& key) {
-  DCHECK(row != nullptr || key == ROOT_PARTITION_KEY);
-  if (current_partition_.first != nullptr &&
-      key == current_clustered_partition_key_) {
-    return Status::OK();
-  }
-
-  current_partition_.first.reset(new OutputPartition());
-  current_partition_.second.clear();
-  // Build the unique name for this partition from the partition keys, e.g. 
"j=1/f=foo/"
-  // etc.
-  RETURN_IF_ERROR(ConstructPartitionInfo(row, current_partition_.first.get()));
-  Status status = InitOutputPartition(state, *prototype_partition_,
-      current_partition_.first.get(), false);
-  if (!status.ok()) {
-    // We failed to create the output partition successfully. Clean it up now.
-    if (current_partition_.first->writer != nullptr) {
-      current_partition_.first->writer->Close();
-    }
-    return status;
-  }
-
-  // Save the partition name so that the coordinator can create the partition
-  // directory structure if needed.
-  state->dml_exec_state()->AddPartition(
-      current_partition_.first->partition_name, prototype_partition_->id(),
-      &table_desc_->hdfs_base_dir(),
-      nullptr);
-  return Status::OK();
-}
-
-Status IcebergDeleteSink::WriteClusteredRowBatch(RuntimeState* state, 
RowBatch* batch) {
-  DCHECK_GT(batch->num_rows(), 0);
-  DCHECK_EQ(partition_key_expr_evals_.size(), 2);
-  DCHECK(!dynamic_partition_key_expr_evals_.empty());
-
-  // Initialize the clustered partition and key.
-  if (current_partition_.first == nullptr) {
-    TupleRow* current_row = batch->GetRow(0);
-    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_,
-        &current_clustered_partition_key_);
-    RETURN_IF_ERROR(SetCurrentPartition(state, current_row,
-        current_clustered_partition_key_));
-  }
-
-  // Compare the last row of the batch to the last current partition key. If 
they match,
-  // then all the rows in the batch have the same key and can be written as a 
whole.
-  string last_row_key;
-  GetHashTblKey(batch->GetRow(batch->num_rows() - 1),
-      dynamic_partition_key_expr_evals_, &last_row_key);
-  if (last_row_key == current_clustered_partition_key_) {
-    DCHECK(current_partition_.second.empty());
-    RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_partition_.first.get()));
-    return Status::OK();
-  }
-
-  // Not all rows in this batch match the previously written partition key, so 
we process
-  // them individually.
-  for (int i = 0; i < batch->num_rows(); ++i) {
-    TupleRow* current_row = batch->GetRow(i);
-
-    string key;
-    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, &key);
-
-    if (current_clustered_partition_key_ != key) {
-      DCHECK(current_partition_.first->writer != nullptr);
-      // Done with previous partition - write rows and close.
-      if (!current_partition_.second.empty()) {
-        RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_partition_.first.get(),
-            current_partition_.second));
-        current_partition_.second.clear();
-      }
-      RETURN_IF_ERROR(FinalizePartitionFile(state,
-          current_partition_.first.get(), /*is_delete=*/true));
-      if (current_partition_.first->writer.get() != nullptr) {
-        current_partition_.first->writer->Close();
-      }
-      RETURN_IF_ERROR(SetCurrentPartition(state, current_row, key));
-      current_clustered_partition_key_ = std::move(key);
-    }
-#ifdef DEBUG
-    string debug_row_key;
-    GetHashTblKey(current_row, dynamic_partition_key_expr_evals_, 
&debug_row_key);
-    DCHECK_EQ(current_clustered_partition_key_, debug_row_key);
-#endif
-    DCHECK(current_partition_.first->writer != nullptr);
-    current_partition_.second.push_back(i);
-  }
-  // Write final set of rows to the partition but keep its file open.
-  RETURN_IF_ERROR(WriteRowsToPartition(state, batch, 
current_partition_.first.get(),
-      current_partition_.second));
-  current_partition_.second.clear();
-  return Status::OK();
-}
-
-Status IcebergDeleteSink::FlushFinal(RuntimeState* state) {
-  DCHECK(!closed_);
-  SCOPED_TIMER(profile()->total_time_counter());
-
-  if (current_partition_.first != nullptr) {
-    RETURN_IF_ERROR(FinalizePartitionFile(state, 
current_partition_.first.get(),
-        /*is_delete=*/true));
-  }
-  return Status::OK();
-}
-
-void IcebergDeleteSink::Close(RuntimeState* state) {
-  if (closed_) return;
-  SCOPED_TIMER(profile()->total_time_counter());
-
-  if (current_partition_.first != nullptr) {
-    if (current_partition_.first->writer != nullptr) {
-      current_partition_.first->writer->Close();
-    }
-    Status close_status = ClosePartitionFile(state, 
current_partition_.first.get());
-    if (!close_status.ok()) state->LogError(close_status.msg());
-  }
-
-  current_partition_.first.reset();
-  IcebergDeleteSinkBase::Close(state);
-  DCHECK(closed_);
-}
-
-string IcebergDeleteSink::DebugString() const {
-  stringstream out;
-  out << "IcebergDeleteSink("
-      << " table_desc=" << table_desc_->DebugString()
-      << " output_exprs=" << ScalarExpr::DebugString(output_exprs_);
-  if (!partition_key_exprs_.empty()) {
-    out << " partition_key_exprs=" << 
ScalarExpr::DebugString(partition_key_exprs_);
-  }
-  out << ")";
-  return out.str();
-}
-
-} // namespace impala
diff --git a/be/src/exec/iceberg-delete-sink.h 
b/be/src/exec/iceberg-delete-sink.h
deleted file mode 100644
index 9be88d294..000000000
--- a/be/src/exec/iceberg-delete-sink.h
+++ /dev/null
@@ -1,93 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/iceberg-delete-sink-base.h"
-#include "exec/output-partition.h"
-#include "exec/table-sink-base.h"
-
-#include <unordered_map>
-
-namespace impala {
-
-class Expr;
-class IcebergDeleteSinkConfig;
-class TupleDescriptor;
-class TupleRow;
-class MemTracker;
-
-class IcebergDeleteSink : public IcebergDeleteSinkBase {
- public:
-  IcebergDeleteSink(TDataSinkId sink_id, const IcebergDeleteSinkConfig& 
sink_config,
-    RuntimeState* state);
-
-  /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
-  Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
-
-  /// Opens output_exprs and partition_key_exprs.
-  Status Open(RuntimeState* state) override;
-
-  /// Append all rows in batch to position delete files. It is assumed that
-  /// that rows are ordered by partitions, filepaths, and positions.
-  Status Send(RuntimeState* state, RowBatch* batch) override;
-
-  /// Finalize any open files.
-  /// TODO: IMPALA-2988: Move calls to functions that can fail in Close() to 
FlushFinal()
-  Status FlushFinal(RuntimeState* state) override;
-
-  /// Closes writers, output_exprs and partition_key_exprs and releases 
resources.
-  void Close(RuntimeState* state) override;
-
-  std::string DebugString() const override;
-
- private:
-  /// Verifies that the row batch does not contain duplicated rows. This can 
only happen
-  /// in the context of UPDATE FROM statements when we are updating a table 
based on
-  /// another table, e.g.:
-  /// UPDATE t SET t.x = s.x FROM ice_t t, source_tbl s where t.id = s.id;
-  /// Now, if 'source_tbl' has duplicate rows then the JOIN operator would 
produce
-  /// multiple matches for the same row, and we would insert them to the table.
-  /// Therefore, we should always raise an error if we find duplicated rows 
(i.e rows
-  /// having the same filepath + position), because that would corrupt the 
table data
-  /// and the delete files as well.
-  /// For a case where deduplication is not possible at the sink level, see 
the comment
-  /// in IcebergUpdateImpl.buildAndValidateSelectExprs() in the Frontend Java 
code.
-  Status VerifyRowsNotDuplicated(RowBatch* batch);
-
-  /// Maps all rows in 'batch' to partitions and appends them to their 
temporary Hdfs
-  /// files. The input must be ordered by the partition key expressions.
-  Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) 
WARN_UNUSED_RESULT;
-
-  /// Sets and initializes the 'current_partition_' based on key. For 
unpartitioned tables
-  /// it is only invoked once to initialize the only output partition.
-  /// For partitioned tables the rows are clustered based on partition data, 
i.e. when the
-  /// key changes we initialize a new output partition.
-  Status SetCurrentPartition(RuntimeState* state, const TupleRow* row,
-      const std::string& key) WARN_UNUSED_RESULT;
-
-  /// The sink writes partitions one-by-one.
-  PartitionPair current_partition_;
-
-  /// Variables necessary for validating that row batches don't contain 
duplicates.
-  std::string prev_file_path_;
-  int64_t prev_position_ = -1;
-};
-
-}
-
-
diff --git a/be/src/exec/multi-table-sink.cc b/be/src/exec/multi-table-sink.cc
index 3dd4b44c0..c37e9cf2d 100644
--- a/be/src/exec/multi-table-sink.cc
+++ b/be/src/exec/multi-table-sink.cc
@@ -17,7 +17,6 @@
 
 #include "common/object-pool.h"
 #include "exec/hdfs-table-sink.h"
-#include "exec/iceberg-delete-sink.h"
 #include "exec/multi-table-sink.h"
 #include "runtime/fragment-state.h"
 #include "runtime/runtime-state.h"
@@ -95,4 +94,4 @@ void MultiTableSink::Close(RuntimeState* state) {
   DCHECK(closed_);
 }
 
-}
\ No newline at end of file
+}
diff --git a/be/src/runtime/dml-exec-state.cc b/be/src/runtime/dml-exec-state.cc
index acf88ccdf..49e7f05bc 100644
--- a/be/src/runtime/dml-exec-state.cc
+++ b/be/src/runtime/dml-exec-state.cc
@@ -133,6 +133,10 @@ void DmlExecState::Update(const DmlExecStatusPB& 
dml_exec_status) {
       files_to_move_[file.staging_path()] = file.final_path();
     }
   }
+  data_files_referenced_by_position_deletes_.insert(
+      data_files_referenced_by_position_deletes_.end(),
+      dml_exec_status.data_files_referenced_by_position_deletes().begin(),
+      dml_exec_status.data_files_referenced_by_position_deletes().end());
 }
 
 uint64_t DmlExecState::GetKuduLatestObservedTimestamp() {
@@ -436,6 +440,9 @@ void DmlExecState::ToProto(DmlExecStatusPB* dml_status) {
   for (const PartitionStatusMap::value_type& part : per_partition_status_) {
     (*dml_status->mutable_per_partition_status())[part.first] = part.second;
   }
+  *dml_status->mutable_data_files_referenced_by_position_deletes() =
+      {data_files_referenced_by_position_deletes_.begin(),
+      data_files_referenced_by_position_deletes_.end()};
 }
 
 void DmlExecState::ToTDmlResult(TDmlResult* dml_result) {
diff --git a/be/src/runtime/dml-exec-state.h b/be/src/runtime/dml-exec-state.h
index da9f4ff3d..9c6a4e87f 100644
--- a/be/src/runtime/dml-exec-state.h
+++ b/be/src/runtime/dml-exec-state.h
@@ -20,6 +20,7 @@
 #include <map>
 #include <mutex>
 #include <string>
+#include <vector>
 #include <boost/unordered_map.hpp>
 
 #include "common/hdfs.h"
@@ -131,6 +132,22 @@ class DmlExecState {
   // Encodes delete file list info in flatbuffer format expected by Iceberg 
API.
   std::vector<std::string> CreateIcebergDeleteFilesVector();
 
+  // Returns vector of Iceberg data files referenced by position delete 
records by
+  // this DML statement.
+  const std::vector<std::string>& DataFilesReferencedByPositionDeletes() const 
{
+    return data_files_referenced_by_position_deletes_;
+  }
+
+  // Reserves capacity for 'data_files_referenced_by_position_deletes_'.
+  void reserveReferencedDataFiles(int capacity) {
+    data_files_referenced_by_position_deletes_.reserve(capacity);
+  }
+
+  // Adds file_path to the list of data files referenced by position delete 
records.
+  void addReferencedDataFile(std::string&& file_path) {
+    
data_files_referenced_by_position_deletes_.emplace_back(std::move(file_path));
+  }
+
  private:
   /// Auxiliary function used by 'AddCreatedFile' and 'AddCreatedDeleteFile'.
   void AddFileAux(const OutputPartition& partition, bool is_iceberg,
@@ -152,6 +169,10 @@ class DmlExecState {
   typedef std::map<std::string, std::string> FileMoveMap;
   FileMoveMap files_to_move_;
 
+  /// In case of Iceberg modify statements it contains the data files 
referenced
+  /// by position delete records.
+  std::vector<std::string> data_files_referenced_by_position_deletes_;
+
   /// Determines what the permissions of directories created by INSERT 
statements should
   /// be if permission inheritance is enabled. Populates a map from all 
prefixes of
   /// 'path_str' (including the full path itself) which is a path in Hdfs, to 
pairs
diff --git a/be/src/service/client-request-state.cc 
b/be/src/service/client-request-state.cc
index d68590125..26f6c1139 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -1630,12 +1630,16 @@ bool ClientRequestState::CreateIcebergCatalogOps(
   } else if (ice_finalize_params.operation == TIcebergOperation::DELETE) {
     cat_ice_op->__set_iceberg_delete_files_fb(
         dml_exec_state->CreateIcebergDeleteFilesVector());
+    cat_ice_op->__set_data_files_referenced_by_position_deletes(
+        dml_exec_state->DataFilesReferencedByPositionDeletes());
     if (cat_ice_op->iceberg_delete_files_fb.empty()) update_catalog = false;
   } else if (ice_finalize_params.operation == TIcebergOperation::UPDATE) {
     cat_ice_op->__set_iceberg_data_files_fb(
         dml_exec_state->CreateIcebergDataFilesVector());
     cat_ice_op->__set_iceberg_delete_files_fb(
         dml_exec_state->CreateIcebergDeleteFilesVector());
+    cat_ice_op->__set_data_files_referenced_by_position_deletes(
+        dml_exec_state->DataFilesReferencedByPositionDeletes());
     if (cat_ice_op->iceberg_delete_files_fb.empty()) {
       DCHECK(cat_ice_op->iceberg_data_files_fb.empty());
       update_catalog = false;
diff --git a/common/protobuf/control_service.proto 
b/common/protobuf/control_service.proto
index 78d4222ab..02933b6f2 100644
--- a/common/protobuf/control_service.proto
+++ b/common/protobuf/control_service.proto
@@ -109,6 +109,10 @@ message DmlExecStatusPB {
   // root's key in an unpartitioned table being ROOT_PARTITION_KEY.
   // The target table name is recorded in the corresponding TQueryExecRequest
   map<string, DmlPartitionStatusPB> per_partition_status = 1;
+
+  // In case of Iceberg modify statements it contains the data files referenced
+  // by position delete records.
+  repeated string data_files_referenced_by_position_deletes = 2;
 }
 
 // Error message exchange format
diff --git a/common/thrift/CatalogService.thrift 
b/common/thrift/CatalogService.thrift
index 1e4e9f5ea..e8d2f7647 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -244,6 +244,9 @@ struct TIcebergOperationParam {
 
   // The snapshot id when the operation was started
   4: optional i64 initial_snapshot_id;
+
+  // The data files referenced by the position delete files.
+  7: optional list<string> data_files_referenced_by_position_deletes
 }
 
 // Per-partion info needed by Catalog to handle an INSERT.
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 90ec3e29a..3033ab595 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -106,13 +106,11 @@ struct THdfsTableSink {
 }
 
 // Structure to encapsulate specific options that are passed down to the
-// IcebergDeleteSink.
+// IcebergBufferedDeleteSink.
 struct TIcebergDeleteSink {
   // Partition expressions of this sink. In case of Iceberg DELETEs these are 
the
   // partition spec id and the serialized partition data.
   1: required list<Exprs.TExpr> partition_key_exprs
-  // True if we are using the buffered delete sink.
-  2: required bool is_buffered = false
 }
 
 // Structure to encapsulate specific options that are passed down to the 
KuduTableSink
diff --git a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java 
b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
index 6bc51d66f..d1138d00f 100644
--- a/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
+++ b/fe/src/main/java/org/apache/impala/analysis/IcebergDeleteImpl.java
@@ -23,7 +23,7 @@ import org.apache.impala.catalog.FeIcebergTable;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.planner.DataSink;
-import org.apache.impala.planner.IcebergDeleteSink;
+import org.apache.impala.planner.IcebergBufferedDeleteSink;
 import org.apache.impala.planner.TableSink;
 import org.apache.impala.thrift.TSortingOrder;
 
@@ -65,7 +65,6 @@ public class IcebergDeleteImpl extends IcebergModifyImpl {
     deleteResultExprs_ = getDeleteResultExprs(analyzer);
     selectList.addAll(ExprUtil.exprsAsSelectList(deletePartitionKeyExprs_));
     selectList.addAll(ExprUtil.exprsAsSelectList(deleteResultExprs_));
-    sortExprs_.addAll(deleteResultExprs_);
   }
 
   @Override
@@ -80,7 +79,7 @@ public class IcebergDeleteImpl extends IcebergModifyImpl {
   @Override
   public DataSink createDataSink() {
     Preconditions.checkState(modifyStmt_.table_ instanceof FeIcebergTable);
-    return new IcebergDeleteSink(icePosDelTable_, deletePartitionKeyExprs_,
+    return new IcebergBufferedDeleteSink(icePosDelTable_, 
deletePartitionKeyExprs_,
         deleteResultExprs_);
   }
 }
\ No newline at end of file
diff --git 
a/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
index fda1f11be..660f84c33 100644
--- a/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/IcebergBufferedDeleteSink.java
@@ -113,7 +113,6 @@ public class IcebergBufferedDeleteSink extends TableSink {
   protected void toThriftImpl(TDataSink tsink) {
     TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
     
icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_));
-    icebergDeleteSink.setIs_buffered(true);
     TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
         TTableSinkType.HDFS, sinkOp_.toThrift());
     tTableSink.iceberg_delete_sink = icebergDeleteSink;
diff --git a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java 
b/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
deleted file mode 100644
index 2e2387ddd..000000000
--- a/fe/src/main/java/org/apache/impala/planner/IcebergDeleteSink.java
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.impala.planner;
-
-import java.util.List;
-
-import org.apache.impala.analysis.DescriptorTable;
-import org.apache.impala.analysis.Expr;
-import org.apache.impala.catalog.FeIcebergTable;
-import org.apache.impala.common.ByteUnits;
-import org.apache.impala.thrift.TDataSink;
-import org.apache.impala.thrift.TDataSinkType;
-import org.apache.impala.thrift.TExplainLevel;
-import org.apache.impala.thrift.TIcebergDeleteSink;
-import org.apache.impala.thrift.TQueryOptions;
-import org.apache.impala.thrift.TTableSink;
-import org.apache.impala.thrift.TTableSinkType;
-
-/**
- * Sink for deleting data from Iceberg tables. Impala deletes data via 
'merge-on-read'
- * strategy, which means it writes Iceberg position delete files. These files 
contain
- * information (file_path, position) about the deleted rows. Query engines 
reading from
- * an Iceberg table need to exclude the deleted rows from the result of the 
table scan.
- * Impala does this by doing an ANTI JOIN between data files and delete files.
- */
-public class IcebergDeleteSink extends TableSink {
-  final private int deleteTableId_;
-
-  // Exprs for computing the output partition(s).
-  protected final List<Expr> partitionKeyExprs_;
-
-  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> 
partitionKeyExprs,
-      List<Expr> outputExprs) {
-    this(targetTable, partitionKeyExprs, outputExprs, 0);
-  }
-
-  public IcebergDeleteSink(FeIcebergTable targetTable, List<Expr> 
partitionKeyExprs,
-      List<Expr> outputExprs, int deleteTableId) {
-    super(targetTable, Op.DELETE, outputExprs);
-    partitionKeyExprs_ = partitionKeyExprs;
-    deleteTableId_ = deleteTableId;
-  }
-
-  @Override
-  public void computeProcessingCost(TQueryOptions queryOptions) {
-    // The processing cost to export rows.
-    processingCost_ = computeDefaultProcessingCost();
-  }
-
-  @Override
-  public void computeResourceProfile(TQueryOptions queryOptions) {
-    PlanNode inputNode = fragment_.getPlanRoot();
-    final int numInstances = fragment_.getNumInstances();
-    // Input is clustered, so it produces a single partition at a time.
-    final long numBufferedPartitionsPerInstance = 1;
-    // For regular Parquet files we estimate 1GB memory consumption which is 
already
-    // a conservative, i.e. probably too high memory estimate.
-    // Writing out position delete files means we are writing filenames and 
positions
-    // per partition. So assuming 0.5 GB per position delete file writer can 
be still
-    // considered a very conservative estimate.
-    final long perPartitionMemReq = 512L * ByteUnits.MEGABYTE;
-
-    long perInstanceMemEstimate;
-    // The estimate is based purely on the per-partition mem req if the input 
cardinality_
-    // or the avg row size is unknown.
-    if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
-      perInstanceMemEstimate = numBufferedPartitionsPerInstance * 
perPartitionMemReq;
-    } else {
-      // The per-partition estimate may be higher than the memory required to 
buffer
-      // the entire input data.
-      long perInstanceInputCardinality =
-          Math.max(1L, inputNode.getCardinality() / numInstances);
-      long perInstanceInputBytes =
-          (long) Math.ceil(perInstanceInputCardinality * 
inputNode.getAvgRowSize());
-      long perInstanceMemReq =
-          PlanNode.checkedMultiply(numBufferedPartitionsPerInstance, 
perPartitionMemReq);
-      perInstanceMemEstimate = Math.min(perInstanceInputBytes, 
perInstanceMemReq);
-    }
-    resourceProfile_ = ResourceProfile.noReservation(perInstanceMemEstimate);
-  }
-
-  @Override
-  public void appendSinkExplainString(String prefix, String detailPrefix,
-      TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder 
output) {
-    output.append(String.format("%sDELETE FROM ICEBERG [%s]\n", prefix,
-        targetTable_.getFullName()));
-    if (explainLevel.ordinal() >= TExplainLevel.EXTENDED.ordinal()) {
-      output.append(detailPrefix + "output exprs: ")
-            .append(Expr.getExplainString(outputExprs_, explainLevel) + "\n");
-      if (!partitionKeyExprs_.isEmpty()) {
-        output.append(detailPrefix + "partition keys: ")
-              .append(Expr.getExplainString(partitionKeyExprs_, explainLevel) 
+ "\n");
-      }
-    }
-  }
-
-  @Override
-  protected String getLabel() {
-    return "ICEBERG DELETER";
-  }
-
-  @Override
-  protected void toThriftImpl(TDataSink tsink) {
-    TIcebergDeleteSink icebergDeleteSink = new TIcebergDeleteSink();
-    
icebergDeleteSink.setPartition_key_exprs(Expr.treesToThrift(partitionKeyExprs_));
-    TTableSink tTableSink = new TTableSink(DescriptorTable.TABLE_SINK_ID,
-            TTableSinkType.HDFS, sinkOp_.toThrift());
-    tTableSink.iceberg_delete_sink = icebergDeleteSink;
-    tTableSink.setTarget_table_id(deleteTableId_);
-    tsink.table_sink = tTableSink;
-    tsink.output_exprs = Expr.treesToThrift(outputExprs_);
-  }
-
-  @Override
-  protected TDataSinkType getSinkType() {
-    return TDataSinkType.TABLE_SINK;
-  }
-
-  @Override
-  public void collectExprs(List<Expr> exprs) {
-    exprs.addAll(partitionKeyExprs_);
-    exprs.addAll(outputExprs_);
-  }
-
-  @Override
-  public void computeRowConsumptionAndProductionToCost() {
-    super.computeRowConsumptionAndProductionToCost();
-    fragment_.setFixedInstanceCount(fragment_.getNumInstances());
-  }
-}
diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java 
b/fe/src/main/java/org/apache/impala/planner/TableSink.java
index 7c869f0da..5da64cba3 100644
--- a/fe/src/main/java/org/apache/impala/planner/TableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java
@@ -133,11 +133,8 @@ public abstract class TableSink extends DataSink {
       if (sinkAction == Op.INSERT) {
         return new HdfsTableSink(table, partitionKeyExprs,outputExprs, 
overwrite,
             inputIsClustered, sortProperties, writeId, maxTableSinks, 
isResultSink);
-      } else if (sinkAction == Op.DELETE) {
-        return new IcebergDeleteSink((FeIcebergTable)table, partitionKeyExprs,
-            outputExprs, maxTableSinks);
       } else {
-        // We don't support any other sink actions yet.
+        // Other SINK actions are either not supported or created directly.
         Preconditions.checkState(false);
       }
     }
diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 05f1d4e2e..f126bf16e 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -375,6 +375,9 @@ public class IcebergCatalogOpExecutor {
       // affected by this DELETE operation.
       rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
       rowDelta.validateNoConflictingDataFiles();
+      rowDelta.validateDataFilesExist(
+          icebergOp.getData_files_referenced_by_position_deletes());
+      rowDelta.validateDeletedFiles();
       rowDelta.commit();
     } catch (ValidationException e) {
       throw new ImpalaRuntimeException(e.getMessage(), e);
@@ -403,6 +406,9 @@ public class IcebergCatalogOpExecutor {
       rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
       rowDelta.validateNoConflictingDataFiles();
       rowDelta.validateNoConflictingDeleteFiles();
+      rowDelta.validateDataFilesExist(
+          icebergOp.getData_files_referenced_by_position_deletes());
+      rowDelta.validateDeletedFiles();
       rowDelta.commit();
     } catch (ValidationException e) {
       throw new ImpalaRuntimeException(e.getMessage(), e);
@@ -566,4 +572,4 @@ public class IcebergCatalogOpExecutor {
                     String.valueOf(version));
     updateProps.commit();
   }
-}
\ No newline at end of file
+}
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
index c75cdaea0..a0b8aceed 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/iceberg-v2-delete.test
@@ -1,10 +1,6 @@
 DELETE FROM iceberg_v2_no_deletes where i = 3
 ---- PLAN
-DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
-|
-01:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
@@ -12,11 +8,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
    Iceberg snapshot id: 728158873687794725
    row-size=24B cardinality=1
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG [functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
-|
-01:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
 |
 00:SCAN HDFS [functional_parquet.iceberg_v2_no_deletes]
    HDFS partitions=1/1 files=1 size=625B
@@ -26,11 +18,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_no_deletes-POSITION-DELETE]
 ====
 DELETE FROM iceberg_v2_delete_positional where id = 15
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-03:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=28B cardinality=1
@@ -46,11 +34,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
    Iceberg snapshot id: 5725822353600261755
    row-size=28B cardinality=1
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-04:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=28B cardinality=1
@@ -71,11 +55,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 DELETE FROM iceberg_v2_delete_positional
 where id = (select min(id) from iceberg_v2_delete_positional)
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-08:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=2
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 07:HASH JOIN [LEFT SEMI JOIN]
 |  hash predicates: id = min(id)
@@ -113,11 +93,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
    Iceberg snapshot id: 5725822353600261755
    row-size=28B cardinality=3
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-13:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=2
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 07:HASH JOIN [LEFT SEMI JOIN, BROADCAST]
 |  hash predicates: id = min(id)
@@ -169,11 +145,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 ====
 DELETE FROM iceberg_v2_delete_positional WHERE FILE__POSITION = id
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-03:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
 |  row-size=28B cardinality=1
@@ -189,11 +161,7 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
    Iceberg snapshot id: 5725822353600261755
    row-size=28B cardinality=1
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
-|
-04:SORT
-|  order by: input__file__name ASC NULLS LAST, file__position ASC NULLS LAST
-|  row-size=20B cardinality=1
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DELETE]
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
 |  row-size=28B cardinality=1
@@ -213,10 +181,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_delete_positional-POSITION-DE
 ====
 delete from iceberg_v2_partitioned_position_deletes where id = 20;
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -233,10 +201,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
    Iceberg snapshot id: 8885697082976537578
    row-size=40B cardinality=2
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 05:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 04:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
@@ -259,10 +227,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
 ====
 delete from iceberg_v2_partitioned_position_deletes where action = 'click';
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -279,10 +247,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
    skipped Iceberg predicates: action = 'click'
    row-size=36B cardinality=6
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 04:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=3
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN, DIRECTED]
@@ -303,10 +271,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
 ====
 delete from iceberg_v2_partitioned_position_deletes where user like 'A%';
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -323,10 +291,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
    Iceberg snapshot id: 8885697082976537578
    row-size=48B cardinality=2
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 05:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 04:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
@@ -350,10 +318,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
 delete from iceberg_v2_partitioned_position_deletes
 where id = (select max(id) from iceberg_v2_delete_positional);
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 08:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=10
 |
 07:HASH JOIN [LEFT SEMI JOIN]
@@ -392,10 +360,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
    Iceberg snapshot id: 8885697082976537578
    row-size=40B cardinality=20
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 14:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=10
 |
 13:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
@@ -450,10 +418,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
 ====
 DELETE FROM iceberg_v2_partitioned_position_deletes WHERE FILE__POSITION = id
 ---- PLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 03:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 02:DELETE EVENTS ICEBERG DELETE [ICEBERG DELETE JOIN]
@@ -470,10 +438,10 @@ DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-
    Iceberg snapshot id: 8885697082976537578
    row-size=40B cardinality=2
 ---- DISTRIBUTEDPLAN
-DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
+BUFFERED DELETE FROM ICEBERG 
[functional_parquet.iceberg_v2_partitioned_position_deletes-POSITION-DELETE]
 |
 05:SORT
-|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST, input__file__name ASC NULLS 
LAST, file__position ASC NULLS LAST
+|  order by: partition__spec__id ASC NULLS LAST, 
iceberg__partition__serialized ASC NULLS LAST
 |  row-size=36B cardinality=1
 |
 04:EXCHANGE 
[HASH(functional_parquet.iceberg_v2_partitioned_position_deletes.PARTITION__SPEC__ID,functional_parquet.iceberg_v2_partitioned_position_deletes.ICEBERG__PARTITION__SERIALIZED)]
diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index 9cc767746..35f380e31 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -149,7 +149,7 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
     run_tasks([updater_a, updater_b, updater_c] + checkers)
 
 
-class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
+class TestIcebergConcurrentOperations(ImpalaTestSuite):
   """This test checks that concurrent DELETE and UPDATE operations leave the 
table
   in a consistent state."""
 
@@ -159,12 +159,12 @@ class 
TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
 
   @classmethod
   def add_test_dimensions(cls):
-    super(TestIcebergConcurrentDeletesAndUpdates, cls).add_test_dimensions()
+    super(TestIcebergConcurrentOperations, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_constraint(
         lambda v: (v.get_value('table_format').file_format == 'parquet'
             and v.get_value('table_format').compression_codec == 'snappy'))
 
-  def _impala_role_concurrent_deleter(self, tbl_name, flag, num_rows):
+  def _impala_role_concurrent_deleter(self, tbl_name, all_rows_deleted, 
num_rows):
     """Deletes every row from the table one by one."""
     target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
     impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
@@ -179,15 +179,15 @@ class 
TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
         # Exceptions are expected due to concurrent operations.
         print(str(e))
       time.sleep(random.random())
-    flag.value = 1
+    all_rows_deleted.value = 1
     impalad_client.close()
 
-  def _impala_role_concurrent_writer(self, tbl_name, flag):
+  def _impala_role_concurrent_writer(self, tbl_name, all_rows_deleted):
     """Updates every row in the table in a loop."""
     target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
     impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
     impalad_client.set_configuration_option("SYNC_DDL", "true")
-    while flag.value != 1:
+    while all_rows_deleted.value != 1:
       try:
         impalad_client.execute(
             "update {0} set j = j + 1".format(tbl_name))
@@ -197,7 +197,21 @@ class 
TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
       time.sleep(random.random())
     impalad_client.close()
 
-  def _impala_role_concurrent_checker(self, tbl_name, flag, num_rows):
+  def _impala_role_concurrent_optimizer(self, tbl_name, all_rows_deleted):
+    """Optimizes the table in a loop."""
+    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    impalad_client.set_configuration_option("SYNC_DDL", "true")
+    while all_rows_deleted.value != 1:
+      try:
+        impalad_client.execute("optimize table {0}".format(tbl_name))
+      except Exception as e:
+        # Exceptions are expected due to concurrent operations.
+        print(str(e))
+      time.sleep(random.random())
+    impalad_client.close()
+
+  def _impala_role_concurrent_checker(self, tbl_name, all_rows_deleted, 
num_rows):
     """Checks if the table's invariant is true. The invariant is that we have a
     consecutive range of 'id's starting from N to num_rows - 1. And 'j's are 
equal."""
     def verify_result_set(result):
@@ -214,7 +228,7 @@ class 
TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
 
     target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
     impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
-    while flag.value != 1:
+    while all_rows_deleted.value != 1:
       result = impalad_client.execute("select * from %s order by id" % 
tbl_name)
       verify_result_set(result)
       time.sleep(random.random())
@@ -232,19 +246,52 @@ class 
TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
         stored as iceberg
         tblproperties('format-version'='2')""".format(tbl_name,))
 
+    num_rows = 10
+    for i in range(num_rows):
+      self.client.execute("insert into {} values ({}, 0)".format(tbl_name, i))
+
+    all_rows_deleted = Value('i', 0)
+    deleter = Task(self._impala_role_concurrent_deleter, tbl_name, 
all_rows_deleted,
+        num_rows)
+    updater = Task(self._impala_role_concurrent_writer, tbl_name, 
all_rows_deleted)
+    checker = Task(self._impala_role_concurrent_checker, tbl_name, 
all_rows_deleted,
+        num_rows)
+    run_tasks([deleter, updater, checker])
+
+    result = self.client.execute("select count(*) from {}".format(tbl_name))
+    assert result.data == ['0']
+
+  @pytest.mark.stress
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_iceberg_deletes_and_updates_and_optimize(self, unique_database):
+    """Issues DELETE and UPDATE statements in parallel in a way that some
+    invariants must be true when a spectator process inspects the table.
+    An optimizer thread also invokes OPTMIZE regularly on the table."""
+
+    tbl_name = "%s.test_concurrent_write_and_optimize" % unique_database
+    self.client.set_configuration_option("SYNC_DDL", "true")
+    self.client.execute("""create table {0} (id int, j bigint)
+        stored as iceberg
+        tblproperties('format-version'='2')""".format(tbl_name,))
+
     num_rows = 10
     values_str = ""
+    # Prepare INSERT statement of 'num_rows' records.
     for i in range(num_rows):
       values_str += "({}, 0)".format(i)
       if i != num_rows - 1:
         values_str += ", "
     self.client.execute("insert into {} values {}".format(tbl_name, 
values_str))
 
-    flag = Value('i', 0)
-    deleter = Task(self._impala_role_concurrent_deleter, tbl_name, flag, 
num_rows)
-    updater = Task(self._impala_role_concurrent_writer, tbl_name, flag)
-    checker = Task(self._impala_role_concurrent_checker, tbl_name, flag, 
num_rows)
-    run_tasks([deleter, updater, checker])
+    all_rows_deleted = Value('i', 0)
+    deleter = Task(self._impala_role_concurrent_deleter, tbl_name, 
all_rows_deleted,
+        num_rows)
+    updater = Task(self._impala_role_concurrent_writer, tbl_name, 
all_rows_deleted)
+    optimizer = Task(self._impala_role_concurrent_optimizer, tbl_name,
+        all_rows_deleted)
+    checker = Task(self._impala_role_concurrent_checker, tbl_name, 
all_rows_deleted,
+        num_rows)
+    run_tasks([deleter, updater, optimizer, checker])
 
     result = self.client.execute("select count(*) from {}".format(tbl_name))
     assert result.data == ['0']

Reply via email to