This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 2130e0d5ad [Fix] Fix oracle sample data from column error (#7340) 2130e0d5ad is described below commit 2130e0d5ad720d2c4af71a886d4fed55d50c0499 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Tue Aug 20 10:37:27 2024 +0800 [Fix] Fix oracle sample data from column error (#7340) --- .../internal/dialect/oracle/OracleDialect.java | 42 ++++++++++++++++++++++ .../connectors/seatunnel/jdbc/JdbcOracleIT.java | 10 ++++++ 2 files changed, 52 insertions(+) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java index e1aee7f7d8..b6a35dba0c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java @@ -35,6 +35,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -284,4 +285,45 @@ public class OracleDialect implements JdbcDialect { } } } + + @Override + public Object[] sampleDataFromColumn( + Connection connection, + JdbcSourceTable table, + String columnName, + int samplingRate, + int fetchSize) + throws Exception { + String sampleQuery; + if (StringUtils.isNotBlank(table.getQuery())) { + sampleQuery = + String.format( + "SELECT %s FROM (%s) T", quoteIdentifier(columnName), table.getQuery()); + } else { + sampleQuery = + String.format( + "SELECT %s FROM %s", + quoteIdentifier(columnName), tableIdentifier(table.getTablePath())); + } + + try (PreparedStatement stmt = creatPreparedStatement(connection, sampleQuery, fetchSize)) { + try (ResultSet rs = stmt.executeQuery()) { + int count = 0; + List<Object> results = new ArrayList<>(); + + while (rs.next()) { + count++; + if (count % samplingRate == 0) { + results.add(rs.getObject(1)); + } + if (Thread.currentThread().isInterrupted()) { + throw new InterruptedException("Thread interrupted"); + } + } + Object[] resultsArray = results.toArray(); + Arrays.sort(resultsArray); + return resultsArray; + } + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java index e4b4de3950..b7c4a54b59 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java @@ -136,6 +136,16 @@ public class JdbcOracleIT extends AbstractJdbcIT { .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE)) .build(); dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 1024); + + table = + JdbcSourceTable.builder() + .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE)) + .query( + "select * from " + + quoteIdentifier(SOURCE_TABLE) + + " where INTEGER_COL = 1") + .build(); + dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 1024); } @Override