This is an automated email from the ASF dual-hosted git repository.
wzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
The following commit(s) were added to refs/heads/master by this push:
new a5a99adcd IMPALA-12376: DataSourceScanNode drop some returned rows
a5a99adcd is described below
commit a5a99adcd27caf9906a4836a79a3547ce2229905
Author: wzhou-code <[email protected]>
AuthorDate: Thu Aug 17 17:01:33 2023 -0700
IMPALA-12376: DataSourceScanNode drop some returned rows
DataSourceScanNode does not handle eos properly in function
DataSourceScanNode::GetNext(). Rows, which are returned from
external data source, could be dropped if data_source_batch_size
is set with value which is greater than default value 1024.
Testing:
- Added end-to-end test with data_source_batch_size as 2048.
The test failed without fixing, passed with fixing.
Also added test with data_source_batch_size as 512.
- Passed core tests.
Change-Id: I978d0a65faa63a47ec86a0127c0bee8dfb79530b
Reviewed-on: http://gerrit.cloudera.org:8080/20636
Reviewed-by: Abhishek Rawat <[email protected]>
Tested-by: Wenzhe Zhou <[email protected]>
---
be/src/exec/data-source-scan-node.cc | 5 +++--
.../queries/QueryTest/data-source-tables.test | 4 ++--
tests/custom_cluster/test_ext_data_sources.py | 14 ++++++++++++++
3 files changed, 19 insertions(+), 4 deletions(-)
diff --git a/be/src/exec/data-source-scan-node.cc
b/be/src/exec/data-source-scan-node.cc
index d2601e7af..5a88224fa 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -367,8 +367,9 @@ Status DataSourceScanNode::GetNext(RuntimeState* state,
RowBatch* row_batch, boo
}
++next_row_idx_;
}
- if (row_batch->AtCapacity() || input_batch_->eos || ReachedLimit()) {
- *eos = input_batch_->eos || ReachedLimit();
+ if (row_batch->AtCapacity() || ReachedLimit()
+ || (input_batch_->eos && !InputBatchHasNext())) {
+ *eos = (input_batch_->eos && !InputBatchHasNext()) || ReachedLimit();
COUNTER_SET(rows_returned_counter_, rows_returned());
COUNTER_ADD(rows_read_counter_, rows_read);
return Status::OK();
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
index 47de8f97f..8830a65a4 100644
---
a/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
+++
b/testdata/workloads/functional-query/queries/QueryTest/data-source-tables.test
@@ -168,8 +168,8 @@ union all
(select count(*) from alltypes_datasource
where smallint_col IS DISTINCT FROM 11 and tinyint_col IS DISTINCT FROM 1)
---- RESULTS
-4096
-4096
+4500
+4500
---- TYPES
BIGINT
====
diff --git a/tests/custom_cluster/test_ext_data_sources.py
b/tests/custom_cluster/test_ext_data_sources.py
index ec74a78f7..850e87e19 100644
--- a/tests/custom_cluster/test_ext_data_sources.py
+++ b/tests/custom_cluster/test_ext_data_sources.py
@@ -43,3 +43,17 @@ class TestExtDataSources(CustomClusterTestSuite):
def test_jdbc_data_source(self, vector, unique_database):
"""Start Impala cluster in LocalCatalog Mode"""
self.run_test_case('QueryTest/jdbc-data-source', vector,
use_db=unique_database)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args='--data_source_batch_size=2048')
+ def test_data_source_big_batch_size(self, vector, unique_database):
+ """Run test with batch size greater than default size 1024"""
+ self.run_test_case('QueryTest/data-source-tables', vector,
use_db=unique_database)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args='--data_source_batch_size=512')
+ def test_data_source_small_batch_size(self, vector, unique_database):
+ """Run test with batch size less than default size 1024"""
+ self.run_test_case('QueryTest/data-source-tables', vector,
use_db=unique_database)