healchow commented on code in PR #6979:
URL: https://github.com/apache/inlong/pull/6979#discussion_r1052901776


##########
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java:
##########
@@ -137,4 +146,71 @@ public List<LogicalTypeRoot> unsupportedTypes() {
                 LogicalTypeRoot.SYMBOL,
                 LogicalTypeRoot.UNRESOLVED);
     }
+
+    @Override
+    public List<String> getAndSetPkNamesFromDb(String tableIdentifier, 
JdbcOptions jdbcOptions) {
+        PreparedStatement st = null;
+        try {
+            JdbcOptions jdbcExecOptions =
+                    JdbcOptions.builder()
+                            .setDBUrl(jdbcOptions.getDbURL() + "/"
+                                    + 
JdbcMultiBatchingComm.getTDbNameFromIdentifier(tableIdentifier))
+                            
.setTableName(JdbcMultiBatchingComm.getTbNameFromIdentifier(tableIdentifier))
+                            .setDialect(jdbcOptions.getDialect())
+                            .setParallelism(jdbcOptions.getParallelism())
+                            
.setConnectionCheckTimeoutSeconds(jdbcOptions.getConnectionCheckTimeoutSeconds())
+                            .setDriverName(jdbcOptions.getDriverName())
+                            .setUsername(jdbcOptions.getUsername().orElse(""))
+                            .setPassword(jdbcOptions.getPassword().orElse(""))
+                            .build();
+            SimpleJdbcConnectionProvider tableConnectionProvider = new 
SimpleJdbcConnectionProvider(jdbcExecOptions);
+            Connection conn = 
tableConnectionProvider.getOrEstablishConnection();
+            String query = "SELECT\n" +
+                    "\tstring_agg (DISTINCT t3.attname, ',') AS pkColumn,\n" +
+                    "    \tt4.tablename AS tableName\n" +
+                    "FROM\n" +
+                    "\tpg_constraint t1\n" +
+                    "INNER JOIN pg_class t2 ON t1.conrelid = t2.oid\n" +
+                    "INNER JOIN pg_attribute t3 ON t3.attrelid = t2.oid\n" +
+                    "AND array_position (t1.conkey, t3.attnum) is not null\n" +
+                    "INNER JOIN pg_tables t4 on t4.tablename = t2.relname\n" +
+                    "INNER JOIN pg_index t5 ON t5.indrelid = t2.oid\n" +
+                    "AND t3.attnum = ANY (t5.indkey)\n" +
+                    "LEFT JOIN pg_description t6 on t6.objoid = t3.attrelid\n" 
+
+                    "and t6.objsubid = t3.attnum\n" +
+                    "WHERE\n" +
+                    "\tt1.contype = 'p'\n" +
+                    "AND length (t3.attname) > 0\n" +
+                    "AND t2.oid = ?::regclass\n" +
+                    "group by\n" +
+                    "\tt4.tablename";
+            st = conn.prepareStatement(query);
+            st.setString(1, 
JdbcMultiBatchingComm.getTbNameFromIdentifier(tableIdentifier));
+            ResultSet rs = st.executeQuery();
+            if (rs.next()) {
+                String pkColumn = rs.getString("pkColumn");
+                LOG.info("TableIdentifier:{} get pkColumn:{}", 
tableIdentifier, pkColumn);
+                checkAndClose(st);
+                return Arrays.asList(pkColumn.split(","));
+            } else {
+                LOG.info("TableIdentifier:{} get pkColumn: null", 
tableIdentifier);
+                checkAndClose(st);
+                return null;
+            }
+        } catch (Exception e) {
+            LOG.error("TableIdentifier:{} getAndSetPkNamesFromDb get err:", 
tableIdentifier, e);
+            checkAndClose(st);
+        }
+        return null;
+    }
+
+    public void checkAndClose(PreparedStatement st) {

Review Comment:
   It seems `private` is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to