This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch branch-4.4.1
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 87479fa49ed044c1d092dc6ae04c98888fcf5b2b
Author: wzhou-code <[email protected]>
AuthorDate: Fri Jul 26 22:28:51 2024 -0700

    IMPALA-13256: Support more than 2G rows for COUNT(*) on jdbc table
    
    Function DatabaseAccessor.getTotalNumberOfRecords() is defined with
    return data type as int. This caused Impala to throw an exception when
    executing COUNT(*) query for jdbc tables with more than 2G rows.
    This patch fixed the issue by changing the function return type as
    long. It also made number of rows in each TRowBatch fetched from jdbc
    data source not to exceed 2G.
    
    Testing:
     - Passed core-test and tpcds test for jdbc tables.
     - Manually created a jdbc table jdbc_table with more than 2G rows,
       verified that query 'select count(*) from jdbc_table' returned
       correct number of rows. Detailed steps were added in the comments
       of IMPALA-13256.
    
    Change-Id: I47db58300cbe3270bab07da02c3fcde6d7072334
    Reviewed-on: http://gerrit.cloudera.org:8080/21617
    Reviewed-by: Michael Smith <[email protected]>
    Reviewed-by: Abhishek Rawat <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/data-source-scan-node.cc                        | 13 +++++++++----
 .../apache/impala/extdatasource/jdbc/JdbcDataSource.java    | 11 +++++++----
 .../impala/extdatasource/jdbc/dao/DatabaseAccessor.java     |  2 +-
 .../extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java |  4 ++--
 4 files changed, 19 insertions(+), 11 deletions(-)

diff --git a/be/src/exec/data-source-scan-node.cc 
b/be/src/exec/data-source-scan-node.cc
index 71275fdfa..80c47133e 100644
--- a/be/src/exec/data-source-scan-node.cc
+++ b/be/src/exec/data-source-scan-node.cc
@@ -143,6 +143,10 @@ Status DataSourceScanNode::ValidateRowBatchSize() {
         Substitute(ERROR_NUM_COLUMNS, tuple_desc_->slots().size(), 
cols.size()));
   }
 
+  // The capacity of output RowBatch is defined as a 4-byte integer. Making 
sure that
+  // the number of rows in input batch does not exceed the maximum capacity of 
output
+  // RowBatch.
+  DCHECK_LE(input_batch_->rows.num_rows, std::numeric_limits<int32_t>::max());
   num_rows_ = -1;
   // If num_rows was set, use that, otherwise we set it to be the number of 
rows in
   // the first TColumnData and then ensure the number of rows in other columns 
are
@@ -376,10 +380,11 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, boo
         }
       } else {
         // For count(*)
-        rows_read += num_rows_;
-        next_row_idx_ += num_rows_;
-        IncrementNumRowsReturned(num_rows_);
-        if (input_batch_->eos) {
+        if (InputBatchHasNext()) {
+          // Generate one output RowBatch for one input batch
+          rows_read += num_rows_;
+          next_row_idx_ += num_rows_;
+          IncrementNumRowsReturned(num_rows_);
           row_batch->limit_capacity(rows_read);
           row_batch->CommitRows(rows_read);
         }
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
index 1c335d593..065934d06 100644
--- a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
+++ b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java
@@ -214,10 +214,13 @@ public class JdbcDataSource implements ExternalDataSource 
{
       }
       if (!hasNext) eos_ = true;
     } else { // for count(*)
-      // Don't need to check batchSize_.
-      numRows = totalNumberOfRecords_ - currRow_;
-      currRow_ = totalNumberOfRecords_;
-      eos_ = true;
+      // Don't need to check batchSize_. But number of rows returned in a 
RowBatch can
+      // not exceed Integer.MAX_VALUE due to the restriction of RowBatch 
capacity in
+      // backend.
+      numRows = totalNumberOfRecords_ - currRow_ <= Integer.MAX_VALUE ?
+          totalNumberOfRecords_ - currRow_ : Integer.MAX_VALUE;
+      currRow_ += numRows;
+      eos_ = (currRow_ == totalNumberOfRecords_);
     }
     return new TGetNextResult(STATUS_OK).setEos(eos_)
         .setRows(new TRowBatch().setCols(cols).setNum_rows(numRows));
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
index e1af2af76..393fb89a3 100644
--- 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
+++ 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java
@@ -24,7 +24,7 @@ import 
org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessExceptio
 
 public interface DatabaseAccessor {
 
-  int getTotalNumberOfRecords(Configuration conf)
+  long getTotalNumberOfRecords(Configuration conf)
       throws JdbcDatabaseAccessException;
 
   JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int 
offset)
diff --git 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
index 0f7091d50..75f26ad92 100644
--- 
a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
+++ 
b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java
@@ -71,7 +71,7 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
       new DataSourceObjectCache();
 
   @Override
-  public int getTotalNumberOfRecords(Configuration conf)
+  public long getTotalNumberOfRecords(Configuration conf)
       throws JdbcDatabaseAccessException {
     Connection conn = null;
     PreparedStatement ps = null;
@@ -89,7 +89,7 @@ public class GenericJdbcDatabaseAccessor implements 
DatabaseAccessor {
       ps = conn.prepareStatement(countQuery);
       rs = ps.executeQuery();
       if (rs.next()) {
-        return rs.getInt(1);
+        return rs.getLong(1);
       } else {
         LOG.warn("The count query '{}' did not return any results.", 
countQuery);
         throw new JdbcDatabaseAccessException(

Reply via email to