This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 68e61c1aabacee5da37b1f66842d8a2354d0e215
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Mon Feb 5 14:07:54 2024 +0100

    IMPALA-12787: Concurrent DELETE and UPDATE operations on Iceberg tables can 
be problematic
    
    If an UPDATE operation runs concurrently with a DELETE operation, and
    the DELETE commits first, then the UPDATE can revive deleted rows. This
    is because only RowDelta.validateNoConflictingDataFiles() is called, but
    RowDelta.validateNoConflictingDeleteFiles() is not. Therefore, the
    UPDATE operation ignores the concurrently written delete files.
    
    This patch adds RowDelta.validateNoConflictingDeleteFiles() to UPDATE
    operations.
    
    Testing
     * added a stress test to validate concurrent DELETE and UPDATE
        operations
    
    Change-Id: I9e581ea17fa8f6ccd9c87aaad1281bb694079f6e
    Reviewed-on: http://gerrit.cloudera.org:8080/20999
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 .../impala/service/IcebergCatalogOpExecutor.java   |  24 +++--
 tests/stress/test_update_stress.py                 | 103 +++++++++++++++++++++
 2 files changed, 120 insertions(+), 7 deletions(-)

diff --git 
a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java 
b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
index 27a62dea6..cc4d68248 100644
--- a/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/IcebergCatalogOpExecutor.java
@@ -367,7 +367,16 @@ public class IcebergCatalogOpExecutor {
       DeleteFile deleteFile = createDeleteFile(feIcebergTable, buf);
       rowDelta.addDeletes(deleteFile);
     }
-    validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id());
+    try {
+      // Validate that there are no conflicting data files, because if data 
files are
+      // added in the meantime, they potentially contain records that should 
have been
+      // affected by this DELETE operation.
+      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
+      rowDelta.validateNoConflictingDataFiles();
+      rowDelta.commit();
+    } catch (ValidationException e) {
+      throw new ImpalaRuntimeException(e.getMessage(), e);
+    }
   }
 
   private static void updateRows(FeIcebergTable feIcebergTable, Transaction 
txn,
@@ -384,14 +393,15 @@ public class IcebergCatalogOpExecutor {
       DataFile dataFile = createDataFile(feIcebergTable, buf);
       rowDelta.addRows(dataFile);
     }
-    validateAndCommitRowDelta(rowDelta, icebergOp.getInitial_snapshot_id());
-  }
-
-  private static void validateAndCommitRowDelta(RowDelta rowDelta,
-      long initialSnapshotId) throws ImpalaRuntimeException {
     try {
-      rowDelta.validateFromSnapshot(initialSnapshotId);
+      // Validate that there are no conflicting data files, because if data 
files are
+      // added in the meantime, they potentially contain records that should 
have been
+      // affected by this UPDATE operation. Also validate that there are no 
conflicting
+      // delete files, because we don't want to revive records that have been 
deleted
+      // in the meantime.
+      rowDelta.validateFromSnapshot(icebergOp.getInitial_snapshot_id());
       rowDelta.validateNoConflictingDataFiles();
+      rowDelta.validateNoConflictingDeleteFiles();
       rowDelta.commit();
     } catch (ValidationException e) {
       throw new ImpalaRuntimeException(e.getMessage(), e);
diff --git a/tests/stress/test_update_stress.py 
b/tests/stress/test_update_stress.py
index 01c295e9d..dec84ddfc 100644
--- a/tests/stress/test_update_stress.py
+++ b/tests/stress/test_update_stress.py
@@ -20,6 +20,7 @@ from builtins import map, range
 import pytest
 import random
 import time
+from multiprocessing import Value
 
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.parametrize import UniqueDatabase
@@ -146,3 +147,105 @@ class TestIcebergConcurrentUpdateStress(ImpalaTestSuite):
     checkers = [Task(self._impala_role_concurrent_checker, tbl_name, 
target_total)
                 for i in range(0, num_checkers)]
     run_tasks([updater_a, updater_b, updater_c] + checkers)
+
+
+class TestIcebergConcurrentDeletesAndUpdates(ImpalaTestSuite):
+  """This test checks that concurrent DELETE and UPDATE operations leave the 
table
+  in a consistent state."""
+
+  @classmethod
+  def get_workload(self):
+    return 'targeted-stress'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(TestIcebergConcurrentDeletesAndUpdates, cls).add_test_dimensions()
+    cls.ImpalaTestMatrix.add_constraint(
+        lambda v: (v.get_value('table_format').file_format == 'parquet'
+            and v.get_value('table_format').compression_codec == 'snappy'))
+
+  def _impala_role_concurrent_deleter(self, tbl_name, flag, num_rows):
+    """Deletes every row from the table one by one."""
+    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    i = 0
+    while i < num_rows:
+      try:
+        impalad_client.execute(
+            "delete from {0} WHERE id = {1}".format(tbl_name, i))
+        i += 1
+        # Sleep after a succesful operation.
+        time.sleep(random.random())
+      except Exception:
+        # Exceptions are expected due to concurrent operations.
+        pass
+    flag.value = 1
+    impalad_client.close()
+
+  def _impala_role_concurrent_writer(self, tbl_name, flag):
+    """Updates every row in the table in a loop."""
+    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    while flag.value != 1:
+      try:
+        impalad_client.execute(
+            "update {0} set j = j + 1".format(tbl_name))
+        # Sleep after a succesful operation.
+        time.sleep(random.random())
+      except Exception:
+        # Exceptions are expected due to concurrent operations.
+        pass
+    impalad_client.close()
+
+  def _impala_role_concurrent_checker(self, tbl_name, flag, num_rows):
+    """Checks if the table's invariant is true. The invariant is that we have a
+    consecutive range of 'id's starting from N to num_rows - 1. And 'j's are 
equal."""
+    def verify_result_set(result):
+      if len(result.data) == 0: return
+      line = result.data[0]
+      [prev_id, prev_j] = list(map(int, (line.split('\t'))))
+      for line in result.data[1:]:
+        [id, j] = list(map(int, (line.split('\t'))))
+        assert id - prev_id == 1
+        assert j == prev_j
+        prev_id = id
+        prev_j = j
+      assert prev_id == num_rows - 1
+
+    target_impalad = random.randint(0, 
ImpalaTestSuite.get_impalad_cluster_size() - 1)
+    impalad_client = 
ImpalaTestSuite.create_client_for_nth_impalad(target_impalad)
+    while flag.value != 1:
+      result = impalad_client.execute("select * from %s order by id" % 
tbl_name)
+      verify_result_set(result)
+      time.sleep(random.random())
+    impalad_client.close()
+
+  @pytest.mark.stress
+  @UniqueDatabase.parametrize(sync_ddl=True)
+  def test_iceberg_deletes_and_updates(self, unique_database):
+    """Issues DELETE and UPDATE statements in parallel in a way that some
+    invariants must be true when a spectator process inspects the table."""
+
+    tbl_name = "%s.test_concurrent_deletes_and_updates" % unique_database
+    self.client.set_configuration_option("SYNC_DDL", "true")
+    self.client.execute("""create table {0} (id int, j bigint)
+        stored as iceberg
+        tblproperties('format-version'='2')""".format(tbl_name,))
+
+    num_rows = 20
+    values_str = ""
+    for i in range(num_rows):
+      values_str += "({}, 0)".format(i)
+      if i != num_rows - 1:
+        values_str += ", "
+    self.client.execute("insert into {} values {}".format(tbl_name, 
values_str))
+
+    flag = Value('i', 0)
+    deleter = Task(self._impala_role_concurrent_deleter, tbl_name, flag, 
num_rows)
+    updater = Task(self._impala_role_concurrent_writer, tbl_name, flag)
+    checker = Task(self._impala_role_concurrent_checker, tbl_name, flag, 
num_rows)
+    run_tasks([deleter, updater, checker])
+
+    self.client.execute("refresh {}".format(tbl_name))
+    result = self.client.execute("select count(*) from {}".format(tbl_name))
+    assert result.data == ['0']

Reply via email to