This is an automated email from the ASF dual-hosted git repository.
michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new 541fc5ee9 IMPALA-12990: Fix impala-shell handling of unset rows_deleted
541fc5ee9 is described below
commit 541fc5ee9ec2d804f2ba45feb2df5bb96a013f86
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Wed Apr 10 17:20:00 2024 +0200
IMPALA-12990: Fix impala-shell handling of unset rows_deleted
The issue occurred in Python 3 when 0 rows were deleted from Iceberg.
It could also happen in other DMLs with older Impala servers where
TDmlResult.rows_deleted was not set. See the Jira for details of
the error.
Testing:
Extended shell tests for Kudu DML reporting to also cover Iceberg.
Change-Id: I5812b8006b9cacf34a7a0dbbc89a486d8b454438
Reviewed-on: http://gerrit.cloudera.org:8080/21284
Reviewed-by: Impala Public Jenkins <[email protected]>
Tested-by: Impala Public Jenkins <[email protected]>
---
shell/impala_client.py | 31 +++++-----
shell/impala_shell.py | 53 +++++++++-------
tests/custom_cluster/test_hs2_fault_injection.py | 10 +--
tests/shell/test_shell_commandline.py | 79 ++++++++++++++----------
4 files changed, 99 insertions(+), 74 deletions(-)
diff --git a/shell/impala_client.py b/shell/impala_client.py
index faf9486c3..b94df8931 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -311,9 +311,10 @@ class ImpalaClient(object):
# differentiate between DML and non-DML.
def close_dml(self, last_query_handle):
"""Fetches the results of a DML query. Returns a tuple containing the
- number of rows modified and the number of row errors, in that order. If
the DML
- operation doesn't return 'num_row_errors', then the second element in
the tuple
- is None. Returns None if the query was not closed successfully. Not
idempotent."""
+ number of rows modified, the number of rows deleted, and the number of
row errors,
+ in that order. If the DML operation doesn't return 'rows_deleted' or
+ 'num_row_errors', then the respective element in the tuple is None.
+ Returns None if the query was not closed successfully. Not
idempotent."""
raise NotImplementedError()
def close_query(self, last_query_handle):
@@ -661,6 +662,13 @@ class ImpalaClient(object):
# to add arbitrary http headers.
return None
+ def _process_dml_result(self, dml_result):
+ num_rows = sum([int(k) for k in dml_result.rows_modified.values()])
+ num_deleted_rows = None
+ if dml_result.rows_deleted:
+ num_deleted_rows = sum([int(k) for k in
dml_result.rows_deleted.values()])
+ return (num_rows, num_deleted_rows, dml_result.num_row_errors)
+
class ImpalaHS2Client(ImpalaClient):
"""Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
@@ -972,14 +980,8 @@ class ImpalaHS2Client(ImpalaClient):
self._check_hs2_rpc_status(resp.status)
if not resp.dml_result:
raise RPCException("Impala DML operation did not return DML
statistics.")
-
- num_rows = sum([int(k) for k in resp.dml_result.rows_modified.values()])
- if resp.dml_result.rows_deleted:
- num_deleted_rows = sum([int(k) for k in
resp.dml_result.rows_deleted.values()])
- else:
- num_deleted_rows = None
last_query_handle.is_closed = True
- return (num_rows, num_deleted_rows, resp.dml_result.num_row_errors)
+ return self._process_dml_result(resp.dml_result)
finally:
self._clear_current_query_handle()
@@ -1449,17 +1451,12 @@ class ImpalaBeeswaxClient(ImpalaClient):
return
def close_dml(self, last_query_handle):
- insert_result, rpc_status = self._do_beeswax_rpc(
+ dml_result, rpc_status = self._do_beeswax_rpc(
lambda: self.imp_service.CloseInsert(last_query_handle))
if rpc_status != RpcStatus.OK:
raise RPCException()
- num_rows = sum([int(k) for k in insert_result.rows_modified.values()])
- if insert_result.rows_deleted:
- num_deleted_rows = sum([int(k) for k in
insert_result.rows_deleted.values()])
- else:
- num_deleted_rows = None
last_query_handle.is_closed = True
- return (num_rows, num_deleted_rows, insert_result.num_row_errors)
+ return self._process_dml_result(dml_result)
def close_query(self, last_query_handle):
# Set a member in the handle to make sure that it is idempotent
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index e76f09599..7998fce53 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -1343,6 +1343,31 @@ class ImpalaShell(cmd.Cmd, object):
"Est. #Rows", "Peak Mem",
"Est. Peak Mem", "Detail"])
+ def _format_num_rows_report(self, time_elapsed, num_fetched_rows=None,
dml_result=None):
+ num_rows = None
+ verb = None
+ error_report = ""
+ if dml_result is not None:
+ (num_modified_rows, num_deleted_rows, num_row_errors) = dml_result
+ if num_modified_rows == 0 and num_deleted_rows is not None and
num_deleted_rows > 0:
+ verb = "Deleted"
+ num_rows = num_deleted_rows
+ elif num_modified_rows is not None:
+ verb = "Modified"
+ num_rows = num_modified_rows
+ # Add the number of row errors if this DML and the operation supports it.
+ # num_row_errors is None if the DML operation doesn't return it.
+ if num_row_errors is not None:
+ error_report = ", %d row error(s)" % (num_row_errors)
+ elif num_fetched_rows is not None:
+ verb = "Fetched"
+ num_rows = num_fetched_rows
+
+ if verb is not None:
+ return "%s %d row(s)%s in %2.2fs" % (verb, num_rows, error_report,
time_elapsed)
+ else:
+ return "Time elapsed: %2.2fs" % time_elapsed
+
def _execute_stmt(self, query_str, is_dml=False, print_web_link=False):
"""Executes 'query_str' with options self.set_query_options on the Impala
server.
The query is run to completion and close with any results, warnings,
errors or
@@ -1381,8 +1406,7 @@ class ImpalaShell(cmd.Cmd, object):
if is_dml:
# retrieve the error log
warning_log = self.imp_client.get_warning_log(self.last_query_handle)
- (num_rows, num_deleted_rows, num_row_errors) =
self.imp_client.close_dml(
- self.last_query_handle)
+ dml_result = self.imp_client.close_dml(self.last_query_handle)
else:
# impalad does not support the fetching of metadata for certain types
of queries.
if not self.imp_client.expect_result_metadata(query_str,
self.last_query_handle):
@@ -1414,27 +1438,14 @@ class ImpalaShell(cmd.Cmd, object):
if warning_log:
self._print_if_verbose(warning_log)
- # print 'Modified' when is_dml is true (i.e. 1), or 'Fetched' otherwise.
- verb = ["Fetched", "Modified"][is_dml]
- time_elapsed = end_time - start_time
- # Add the number of row errors if this DML and the operation supports it.
- # num_row_errors is None if the DML operation doesn't return it.
- if is_dml and num_row_errors is not None:
- error_report = ", %d row error(s)" % (num_row_errors)
- else:
- error_report = ""
-
- if is_dml and num_rows == 0 and num_deleted_rows > 0:
- verb = "Deleted"
- self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
- (verb, num_deleted_rows, error_report, time_elapsed))
- elif num_rows is not None:
- self._print_if_verbose("%s %d row(s)%s in %2.2fs" %
- (verb, num_rows, error_report, time_elapsed))
+ time_elapsed = end_time - start_time
+ row_report = ""
+ if is_dml:
+ row_report = self._format_num_rows_report(time_elapsed,
dml_result=dml_result)
else:
- self._print_if_verbose("Time elapsed: %2.2fs" %
- (time_elapsed))
+ row_report = self._format_num_rows_report(time_elapsed,
num_fetched_rows=num_rows)
+ self._print_if_verbose(row_report)
if not is_dml:
self.imp_client.close_query(self.last_query_handle)
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py
b/tests/custom_cluster/test_hs2_fault_injection.py
index 67d52c2b7..7f326a9b9 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -312,13 +312,13 @@ class TestHS2FaultInjection(CustomClusterTestSuite):
query_handle = self.custom_hs2_http_client.execute_query(dml, {})
self.custom_hs2_http_client.wait_to_finish(query_handle)
self.transport.enable_fault(502, "Injected Fault", 0.50)
- (num_rows, num_row_errors) = None, None
+ exception = None
try:
- (num_rows, num_row_errors) =
self.custom_hs2_http_client.close_dml(query_handle)
+ self.custom_hs2_http_client.close_dml(query_handle)
except Exception as e:
- assert str(e) == 'HTTP code 502: Injected Fault'
- assert num_rows is None
- assert num_row_errors is None
+ exception = e
+ assert exception is not None
+ assert str(exception) == 'HTTP code 502: Injected Fault'
output = capsys.readouterr()[1].splitlines()
assert output[1] == self.__expect_msg_no_retry("CloseImpalaOperation")
diff --git a/tests/shell/test_shell_commandline.py
b/tests/shell/test_shell_commandline.py
index 32ed405da..3f348ba71 100644
--- a/tests/shell/test_shell_commandline.py
+++ b/tests/shell/test_shell_commandline.py
@@ -915,42 +915,59 @@ class TestImpalaShell(ImpalaTestSuite):
else:
assert "Modified 1 row(s)" in results.stderr
- def _validate_dml_stmt(self, vector, stmt, expected_rows_modified,
expected_row_errors):
- results = run_impala_shell_cmd(vector, ['--query=%s' % stmt])
- expected_output = "Modified %d row(s), %d row error(s)" %\
- (expected_rows_modified, expected_row_errors)
- assert expected_output in results.stderr, results.stderr
-
def test_kudu_dml_reporting(self, vector, unique_database):
if vector.get_value('strict_hs2_protocol'):
pytest.skip("Kudu not supported in strict hs2 mode.")
- db = unique_database
- run_impala_shell_cmd(vector, [
- '--query=create table %s.dml_test (id int primary key, '
- 'age int null) partition by hash(id) partitions 2 stored as kudu' %
db])
-
- self._validate_dml_stmt(
- vector, "insert into %s.dml_test (id) values (7), (7)" % db, 1, 1)
- self._validate_dml_stmt(vector, "insert into %s.dml_test (id) values (7)"
% db, 0, 1)
- self._validate_dml_stmt(
- vector, "upsert into %s.dml_test (id) values (7), (7)" % db, 2, 0)
- self._validate_dml_stmt(
- vector, "update %s.dml_test set age = 1 where id = 7" % db, 1, 0)
- self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" %
db, 1, 0)
-
- # UPDATE/DELETE where there are no matching rows; there are no errors
because the
- # scan produced no rows.
- self._validate_dml_stmt(
- vector, "update %s.dml_test set age = 1 where id = 8" % db, 0, 0)
- self._validate_dml_stmt(vector, "delete from %s.dml_test where id = 7" %
db, 0, 0)
+ create_sql = 'create table %s (id int primary key, age int null)' \
+ 'partition by hash(id) partitions 2 stored as kudu'
+ self._test_dml_reporting(vector, create_sql, unique_database, True)
+
+ def test_iceberg_dml_reporting(self, vector, unique_database):
+ if vector.get_value('strict_hs2_protocol'):
+ pytest.skip("DML results are not completely supported in strict hs2
mode.")
+ create_sql = 'create table %s (id int, age int) ' \
+ 'stored as iceberg tblproperties("format-version"="2")'
+ self._test_dml_reporting(vector, create_sql, unique_database, False)
+
+ def _test_dml_reporting(self, vector, create_sql, db, is_kudu):
+ """ Runs DMLs on Kudu or Iceberg tables and verifies that modifed /
deleted row
+ count and number of row errors in Kudu are reported correctly. Kudu
and Iceberg
+ can have different results when adding rows with the same primary key
as this
+ leads to row errors in Kudu.
+ """
+ tbl = db + ".dml_test"
+ run_impala_shell_cmd(vector, ['--query=' + create_sql % tbl])
+
+ def validate(stmt, expected_rows_modified_iceberg,
expected_rows_modified_kudu,
+ expected_row_errors_kudu, is_delete=False):
+ results = run_impala_shell_cmd(vector, ['--query=' + stmt % tbl])
+ expected = ""
+ if is_kudu:
+ expected = "Modified %d row(s), %d row error(s)" \
+ % (expected_rows_modified_kudu, expected_row_errors_kudu)
+ elif is_delete and expected_rows_modified_iceberg > 0:
+ expected = "Deleted %d row(s)" % expected_rows_modified_iceberg
+ else:
+ expected = "Modified %d row(s)" % expected_rows_modified_iceberg
+ assert expected in results.stderr, results.stderr
+
+ validate("insert into %s (id) values (7), (7)", 2, 1, 1)
+ validate("insert into %s (id) values (7)", 1, 0, 1)
+ if is_kudu:
+ validate("upsert into %s (id) values (7), (7)", -1, 2, 0)
+ validate("update %s set age = 1 where id = 7", 3, 1, 0)
+ validate("delete from %s where id = 7", 3, 1, 0, is_delete=True)
+
+ # UPDATE/DELETE where there are no matching rows; there are no errors in
Kudu because
+ # the scan produced no rows.
+ validate("update %s set age = 1 where id = 8", 0, 0, 0)
+ validate("delete from %s where id = 7", 0, 0, 0, is_delete=True)
# WITH clauses, only apply to INSERT and UPSERT
- self._validate_dml_stmt(vector,
- "with y as (values(7)) insert into %s.dml_test (id) select * from y" %
db, 1, 0)
- self._validate_dml_stmt(vector,
- "with y as (values(7)) insert into %s.dml_test (id) select * from y" %
db, 0, 1)
- self._validate_dml_stmt(vector,
- "with y as (values(7)) upsert into %s.dml_test (id) select * from y" %
db, 1, 0)
+ validate("with y as (values(7)) insert into %s (id) select * from y", 1,
1, 0)
+ validate("with y as (values(7)) insert into %s (id) select * from y", 1,
0, 1)
+ if is_kudu:
+ validate("with y as (values(7)) upsert into %s (id) select * from y",
-1, 1, 0)
def test_missing_query_file(self, vector):
result = run_impala_shell_cmd(vector, ['-f', 'nonexistent.sql'],
expect_success=False)