This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2130e0d5ad [Fix] Fix oracle sample data from column error (#7340)
2130e0d5ad is described below

commit 2130e0d5ad720d2c4af71a886d4fed55d50c0499
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Tue Aug 20 10:37:27 2024 +0800

    [Fix] Fix oracle sample data from column error (#7340)
---
 .../internal/dialect/oracle/OracleDialect.java     | 42 ++++++++++++++++++++++
 .../connectors/seatunnel/jdbc/JdbcOracleIT.java    | 10 ++++++
 2 files changed, 52 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
index e1aee7f7d8..b6a35dba0c 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialect.java
@@ -35,6 +35,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
@@ -284,4 +285,45 @@ public class OracleDialect implements JdbcDialect {
             }
         }
     }
+
+    @Override
+    public Object[] sampleDataFromColumn(
+            Connection connection,
+            JdbcSourceTable table,
+            String columnName,
+            int samplingRate,
+            int fetchSize)
+            throws Exception {
+        String sampleQuery;
+        if (StringUtils.isNotBlank(table.getQuery())) {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM (%s) T", 
quoteIdentifier(columnName), table.getQuery());
+        } else {
+            sampleQuery =
+                    String.format(
+                            "SELECT %s FROM %s",
+                            quoteIdentifier(columnName), 
tableIdentifier(table.getTablePath()));
+        }
+
+        try (PreparedStatement stmt = creatPreparedStatement(connection, 
sampleQuery, fetchSize)) {
+            try (ResultSet rs = stmt.executeQuery()) {
+                int count = 0;
+                List<Object> results = new ArrayList<>();
+
+                while (rs.next()) {
+                    count++;
+                    if (count % samplingRate == 0) {
+                        results.add(rs.getObject(1));
+                    }
+                    if (Thread.currentThread().isInterrupted()) {
+                        throw new InterruptedException("Thread interrupted");
+                    }
+                }
+                Object[] resultsArray = results.toArray();
+                Arrays.sort(resultsArray);
+                return resultsArray;
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
index e4b4de3950..b7c4a54b59 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracleIT.java
@@ -136,6 +136,16 @@ public class JdbcOracleIT extends AbstractJdbcIT {
                         .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
                         .build();
         dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 
1024);
+
+        table =
+                JdbcSourceTable.builder()
+                        .tablePath(TablePath.of(null, SCHEMA, SOURCE_TABLE))
+                        .query(
+                                "select * from "
+                                        + quoteIdentifier(SOURCE_TABLE)
+                                        + " where INTEGER_COL = 1")
+                        .build();
+        dialect.sampleDataFromColumn(connection, table, "INTEGER_COL", 1, 
1024);
     }
 
     @Override

Reply via email to