This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch branch-4.4.1 in repository https://gitbox.apache.org/repos/asf/impala.git
commit 87479fa49ed044c1d092dc6ae04c98888fcf5b2b Author: wzhou-code <[email protected]> AuthorDate: Fri Jul 26 22:28:51 2024 -0700 IMPALA-13256: Support more than 2G rows for COUNT(*) on jdbc table Function DatabaseAccessor.getTotalNumberOfRecords() is defined with return data type as int. This caused Impala to throw an exception when executing COUNT(*) query for jdbc tables with more than 2G rows. This patch fixed the issue by changing the function return type as long. It also made number of rows in each TRowBatch fetched from jdbc data source not to exceed 2G. Testing: - Passed core-test and tpcds test for jdbc tables. - Manually created a jdbc table jdbc_table with more than 2G rows, verified that query 'select count(*) from jdbc_table' returned correct number of rows. Detailed steps were added in the comments of IMPALA-13256. Change-Id: I47db58300cbe3270bab07da02c3fcde6d7072334 Reviewed-on: http://gerrit.cloudera.org:8080/21617 Reviewed-by: Michael Smith <[email protected]> Reviewed-by: Abhishek Rawat <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/data-source-scan-node.cc | 13 +++++++++---- .../apache/impala/extdatasource/jdbc/JdbcDataSource.java | 11 +++++++---- .../impala/extdatasource/jdbc/dao/DatabaseAccessor.java | 2 +- .../extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java | 4 ++-- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/be/src/exec/data-source-scan-node.cc b/be/src/exec/data-source-scan-node.cc index 71275fdfa..80c47133e 100644 --- a/be/src/exec/data-source-scan-node.cc +++ b/be/src/exec/data-source-scan-node.cc @@ -143,6 +143,10 @@ Status DataSourceScanNode::ValidateRowBatchSize() { Substitute(ERROR_NUM_COLUMNS, tuple_desc_->slots().size(), cols.size())); } + // The capacity of output RowBatch is defined as a 4-byte integer. Making sure that + // the number of rows in input batch does not exceed the maximum capacity of output + // RowBatch. + DCHECK_LE(input_batch_->rows.num_rows, std::numeric_limits<int32_t>::max()); num_rows_ = -1; // If num_rows was set, use that, otherwise we set it to be the number of rows in // the first TColumnData and then ensure the number of rows in other columns are @@ -376,10 +380,11 @@ Status DataSourceScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, boo } } else { // For count(*) - rows_read += num_rows_; - next_row_idx_ += num_rows_; - IncrementNumRowsReturned(num_rows_); - if (input_batch_->eos) { + if (InputBatchHasNext()) { + // Generate one output RowBatch for one input batch + rows_read += num_rows_; + next_row_idx_ += num_rows_; + IncrementNumRowsReturned(num_rows_); row_batch->limit_capacity(rows_read); row_batch->CommitRows(rows_read); } diff --git a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java index 1c335d593..065934d06 100644 --- a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java +++ b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/JdbcDataSource.java @@ -214,10 +214,13 @@ public class JdbcDataSource implements ExternalDataSource { } if (!hasNext) eos_ = true; } else { // for count(*) - // Don't need to check batchSize_. - numRows = totalNumberOfRecords_ - currRow_; - currRow_ = totalNumberOfRecords_; - eos_ = true; + // Don't need to check batchSize_. But number of rows returned in a RowBatch can + // not exceed Integer.MAX_VALUE due to the restriction of RowBatch capacity in + // backend. + numRows = totalNumberOfRecords_ - currRow_ <= Integer.MAX_VALUE ? + totalNumberOfRecords_ - currRow_ : Integer.MAX_VALUE; + currRow_ += numRows; + eos_ = (currRow_ == totalNumberOfRecords_); } return new TGetNextResult(STATUS_OK).setEos(eos_) .setRows(new TRowBatch().setCols(cols).setNum_rows(numRows)); diff --git a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java index e1af2af76..393fb89a3 100644 --- a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java +++ b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/DatabaseAccessor.java @@ -24,7 +24,7 @@ import org.apache.impala.extdatasource.jdbc.exception.JdbcDatabaseAccessExceptio public interface DatabaseAccessor { - int getTotalNumberOfRecords(Configuration conf) + long getTotalNumberOfRecords(Configuration conf) throws JdbcDatabaseAccessException; JdbcRecordIterator getRecordIterator(Configuration conf, int limit, int offset) diff --git a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java index 0f7091d50..75f26ad92 100644 --- a/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java +++ b/fe/src/main/java/org/apache/impala/extdatasource/jdbc/dao/GenericJdbcDatabaseAccessor.java @@ -71,7 +71,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { new DataSourceObjectCache(); @Override - public int getTotalNumberOfRecords(Configuration conf) + public long getTotalNumberOfRecords(Configuration conf) throws JdbcDatabaseAccessException { Connection conn = null; PreparedStatement ps = null; @@ -89,7 +89,7 @@ public class GenericJdbcDatabaseAccessor implements DatabaseAccessor { ps = conn.prepareStatement(countQuery); rs = ps.executeQuery(); if (rs.next()) { - return rs.getInt(1); + return rs.getLong(1); } else { LOG.warn("The count query '{}' did not return any results.", countQuery); throw new JdbcDatabaseAccessException(
