This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch branch-4.4.0
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 858794906158439de8b38a66739d62e1231bbd14
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Wed Apr 10 17:20:00 2024 +0200

    IMPALA-12990: Fix impala-shell handling of unset rows_deleted
    
    The issue occurred in Python 3 when 0 rows were deleted from Iceberg.
    It could also happen in other DMLs with older Impala servers where
    TDmlResult.rows_deleted was not set. See the Jira for details of
    the error.
    
    Testing:
    Extended shell tests for Kudu DML reporting to also cover Iceberg.
    
    Change-Id: I5812b8006b9cacf34a7a0dbbc89a486d8b454438
    Reviewed-on: http://gerrit.cloudera.org:8080/21284
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 shell/impala_client.py                           | 31 +++++-----
 shell/impala_shell.py                            | 53 +++++++++-------
 tests/custom_cluster/test_hs2_fault_injection.py | 10 +--
 tests/shell/test_shell_commandline.py            | 79 ++++++++++++++----------
 4 files changed, 99 insertions(+), 74 deletions(-)

diff --git a/shell/impala_client.py b/shell/impala_client.py
index faf9486c3..b94df8931 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -311,9 +311,10 @@ class ImpalaClient(object):
   # differentiate between DML and non-DML.
   def close_dml(self, last_query_handle):
     """Fetches the results of a DML query. Returns a tuple containing the
-       number of rows modified and the number of row errors, in that order. If 
the DML
-       operation doesn't return 'num_row_errors', then the second element in 
the tuple
-       is None. Returns None if the query was not closed successfully. Not 
idempotent."""
+       number of rows modified, the number of rows deleted, and the number of 
row errors,
+       in that order. If the DML operation doesn't return 'rows_deleted' or
+       'num_row_errors', then the respective element in the tuple is None.
+       Returns None if the query was not closed successfully. Not 
idempotent."""
     raise NotImplementedError()
 
   def close_query(self, last_query_handle):
@@ -661,6 +662,13 @@ class ImpalaClient(object):
     # to add arbitrary http headers.
     return None
 
+  def _process_dml_result(self, dml_result):
+    num_rows = sum([int(k) for k in dml_result.rows_modified.values()])
+    num_deleted_rows = None
+    if dml_result.rows_deleted:
+      num_deleted_rows = sum([int(k) for k in 
dml_result.rows_deleted.values()])
+    return (num_rows, num_deleted_rows, dml_result.num_row_errors)
+
 
 class ImpalaHS2Client(ImpalaClient):
   """Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
@@ -972,14 +980,8 @@ class ImpalaHS2Client(ImpalaClient):
       self._check_hs2_rpc_status(resp.status)
       if not resp.dml_result:
         raise RPCException("Impala DML operation did not return DML 
statistics.")
-
-      num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()])
-      if resp.dml_result.rows_deleted:
-        num_deleted_rows = sum([int(k) for k in 
resp.dml_result.rows_deleted.values()])
-      else:
-        num_deleted_rows = None
       last_query_handle.is_closed = True
-      return (num_rows, num_deleted_rows, resp.dml_result.num_row_errors)
+      return self._process_dml_result(resp.dml_result)
     finally:
       self._clear_current_query_handle()
 
@@ -1449,17 +1451,12 @@ class ImpalaBeeswaxClient(ImpalaClient):
         return
 
   def close_dml(self, last_query_handle):
-    insert_result, rpc_status = self._do_beeswax_rpc(
+    dml_result, rpc_status = self._do_beeswax_rpc(
         lambda: self.imp_service.CloseInsert(last_query_handle))
     if rpc_status != RpcStatus.OK:
        raise RPCException()
-    num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
-    if insert_result.rows_deleted:
-      num_deleted_rows = sum([int(k) for k in 
insert_result.rows_deleted.values()])
-    else:
-      num_deleted_rows = None
     last_query_handle.is_closed = True
-    return (num_rows, num_deleted_rows, insert_result.num_row_errors)
+    return self._process_dml_result(dml_result)
 
   def close_query(self, last_query_handle):
     # Set a member in the handle to make sure that it is idempotent
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index e76f09599..7998fce53 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1343,6 +1343,31 @@ class ImpalaShell(cmd.Cmd, object):
                                              "Est. #Rows", "Peak Mem",
                                              "Est. Peak Mem", "Detail"])
 
+  def _format_num_rows_report(self, time_elapsed, num_fetched_rows=None, 
dml_result=None):
+    num_rows = None
+    verb = None
+    error_report = ""
+    if dml_result is not None:
+      (num_modified_rows, num_deleted_rows, num_row_errors) = dml_result
+      if num_modified_rows == 0 and num_deleted_rows is not None and 
num_deleted_rows > 0:
+        verb = "Deleted"
+        num_rows = num_deleted_rows
+      elif num_modified_rows is not None:
+        verb = "Modified"
+        num_rows = num_modified_rows
+      # Add the number of row errors if this DML and the operation supports it.
+      # num_row_errors is None if the DML operation doesn't return it.
+      if num_row_errors is not None:
+        error_report = ", %d row error(s)" % (num_row_errors)
+    elif num_fetched_rows is not None:
+      verb = "Fetched"
+      num_rows = num_fetched_rows
+
+    if verb is not None:
+      return "%s %d row(s)%s in %2.2fs" % (verb, num_rows, error_report, 
time_elapsed)
+    else:
+      return "Time elapsed: %2.2fs" % time_elapsed
+
   def _execute_stmt(self, query_str, is_dml=False, print_web_link=False):
     """Executes 'query_str' with options self.set_query_options on the Impala 
server.
     The query is run to completion and close with any results, warnings, 
errors or
@@ -1381,8 +1406,7 @@ class ImpalaShell(cmd.Cmd, object):
       if is_dml:
         # retrieve the error log
         warning_log = self.imp_client.get_warning_log(self.last_query_handle)
-        (num_rows, num_deleted_rows, num_row_errors) = 
self.imp_client.close_dml(
-            self.last_query_handle)
+        dml_result = self.imp_client.close_dml(self.last_query_handle)
       else:
         # impalad does not support the fetching of metadata for certain types 
of queries.
         if not self.imp_client.expect_result_metadata(query_str, 
self.last_query_handle):
@@ -1414,27 +1438,14 @@ class ImpalaShell(cmd.Cmd, object):
 
       if warning_log:
         self._print_if_verbose(warning_log)
-      # print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise.
-      verb = ["Fetched", "Modified"][is_dml]
-      time_elapsed = end_time - start_time
 
-      # Add the number of row errors if this DML and the operation supports it.
-      # num_row_errors is None if the DML operation doesn't return it.
-      if is_dml and num_row_errors is not None:
-        error_report = ", %d row error(s)" % (num_row_errors)
-      else:
-        error_report = ""
-
-      if is_dml and num_rows == 0 and num_deleted_rows > 0:
-        verb = "Deleted"
-        self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
-            (verb, num_deleted_rows, error_report, time_elapsed))
-      elif num_rows is not None:
-        self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
-            (verb, num_rows, error_report, time_elapsed))
+      time_elapsed = end_time - start_time
+      row_report = ""
+      if is_dml:
+        row_report = self._format_num_rows_report(time_elapsed, 
dml_result=dml_result)
       else:
-        self._print_if_verbose("Time elapsed: %2.2fs" %
-            (time_elapsed))
+        row_report = self._format_num_rows_report(time_elapsed, 
num_fetched_rows=num_rows)
+      self._print_if_verbose(row_report)
 
       if not is_dml:
         self.imp_client.close_query(self.last_query_handle)
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py 
b/tests/custom_cluster/test_hs2_fault_injection.py
index 67d52c2b7..7f326a9b9 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -312,13 +312,13 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
     query_handle = self.custom_hs2_http_client.execute_query(dml, {})
     self.custom_hs2_http_client.wait_to_finish(query_handle)
     self.transport.enable_fault(502, "Injected Fault", 0.50)
-    (num_rows, num_row_errors) = None, None
+    exception = None
     try:
-      (num_rows, num_row_errors) = 
self.custom_hs2_http_client.close_dml(query_handle)
+      self.custom_hs2_http_client.close_dml(query_handle)
     except Exception as e:
-      assert str(e) == 'HTTP code 502: Injected Fault'
-    assert num_rows is None
-    assert num_row_errors is None
+      exception = e
+    assert exception is not None
+    assert str(exception) == 'HTTP code 502: Injected Fault'
     output = capsys.readouterr()[1].splitlines()
     assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")
 
diff --git a/tests/shell/test_shell_commandline.py 
b/tests/shell/test_shell_commandline.py
index 32ed405da..3f348ba71 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -915,42 +915,59 @@ class TestImpalaShell(ImpalaTestSuite):
     else:
       assert "Modified 1 row(s)" in results.stderr
 
-  def _validate_dml_stmt(self, vector, stmt, expected_rows_modified, 
expected_row_errors):
-    results = run_impala_shell_cmd(vector, ['--query=%s' % stmt])
-    expected_output = "Modified %d row(s), %d row error(s)" %\
-        (expected_rows_modified, expected_row_errors)
-    assert expected_output in results.stderr, results.stderr
-
   def test_kudu_dml_reporting(self, vector, unique_database):
     if vector.get_value('strict_hs2_protocol'):
       pytest.skip("Kudu not supported in strict hs2 mode.")
-    db = unique_database
-    run_impala_shell_cmd(vector, [
-        '--query=create table %s.dml_test (id int primary key, '
-        'age int null) partition by hash(id) partitions 2 stored as kudu' % 
db])
-
-    self._validate_dml_stmt(
-        vector, "insert into %s.dml_test (id) values (7), (7)" % db, 1, 1)
-    self._validate_dml_stmt(vector, "insert into %s.dml_test (id) values (7)" 
% db, 0, 1)
-    self._validate_dml_stmt(
-        vector, "upsert into %s.dml_test (id) values (7), (7)" % db, 2, 0)
-    self._validate_dml_stmt(
-        vector, "update %s.dml_test set age = 1 where id = 7" % db, 1, 0)
-    self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % 
db, 1, 0)
-
-    # UPDATE/DELETE where there are no matching rows; there are no errors 
because the
-    # scan produced no rows.
-    self._validate_dml_stmt(
-        vector, "update %s.dml_test set age = 1 where id = 8" % db, 0, 0)
-    self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" % 
db, 0, 0)
+    create_sql = 'create table %s (id int primary key, age int null)' \
+        'partition by hash(id) partitions 2 stored as kudu'
+    self._test_dml_reporting(vector, create_sql, unique_database, True)
+
+  def test_iceberg_dml_reporting(self, vector, unique_database):
+    if vector.get_value('strict_hs2_protocol'):
+      pytest.skip("DML results are not completely supported in strict hs2 
mode.")
+    create_sql = 'create table %s (id int, age int) ' \
+        'stored as iceberg tblproperties("format-version"="2")'
+    self._test_dml_reporting(vector, create_sql, unique_database, False)
+
+  def _test_dml_reporting(self, vector, create_sql, db, is_kudu):
+    """ Runs DMLs on Kudu or Iceberg tables and verifies that modifed / 
deleted row
+        count and number of row errors in Kudu are reported correctly. Kudu 
and Iceberg
+        can have different results when adding rows with the same primary key 
as this
+        leads to row errors in Kudu.
+    """
+    tbl = db + ".dml_test"
+    run_impala_shell_cmd(vector, ['--query=' + create_sql % tbl])
+
+    def validate(stmt, expected_rows_modified_iceberg, 
expected_rows_modified_kudu,
+                 expected_row_errors_kudu, is_delete=False):
+      results = run_impala_shell_cmd(vector, ['--query=' + stmt % tbl])
+      expected = ""
+      if is_kudu:
+        expected = "Modified %d row(s), %d row error(s)" \
+            % (expected_rows_modified_kudu, expected_row_errors_kudu)
+      elif is_delete and expected_rows_modified_iceberg > 0:
+        expected = "Deleted %d row(s)" % expected_rows_modified_iceberg
+      else:
+        expected = "Modified %d row(s)" % expected_rows_modified_iceberg
+      assert expected in results.stderr, results.stderr
+
+    validate("insert into %s (id) values (7), (7)", 2, 1, 1)
+    validate("insert into %s (id) values (7)", 1, 0, 1)
+    if is_kudu:
+      validate("upsert into %s (id) values (7), (7)", -1, 2, 0)
+    validate("update %s set age = 1 where id = 7", 3, 1, 0)
+    validate("delete from %s where id = 7", 3, 1, 0, is_delete=True)
+
+    # UPDATE/DELETE where there are no matching rows; there are no errors in 
Kudu because
+    # the scan produced no rows.
+    validate("update %s set age = 1 where id = 8", 0, 0, 0)
+    validate("delete from %s where id = 7", 0, 0, 0, is_delete=True)
 
     # WITH clauses, only apply to INSERT and UPSERT
-    self._validate_dml_stmt(vector,
-        "with y as (values(7)) insert into %s.dml_test (id) select * from y" % 
db, 1, 0)
-    self._validate_dml_stmt(vector,
-        "with y as (values(7)) insert into %s.dml_test (id) select * from y" % 
db, 0, 1)
-    self._validate_dml_stmt(vector,
-        "with y as (values(7)) upsert into %s.dml_test (id) select * from y" % 
db, 1, 0)
+    validate("with y as (values(7)) insert into %s (id) select * from y", 1, 
1, 0)
+    validate("with y as (values(7)) insert into %s (id) select * from y", 1, 
0, 1)
+    if is_kudu:
+      validate("with y as (values(7)) upsert into %s (id) select * from y", 
-1, 1, 0)
 
   def test_missing_query_file(self, vector):
     result = run_impala_shell_cmd(vector, ['-f', 'nonexistent.sql'], 
expect_success=False)

Reply via email to