wuchong commented on a change in pull request #11906:
URL: https://github.com/apache/flink/pull/11906#discussion_r427835703



##########
File path: 
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -126,31 +124,33 @@ public String getBaseUrl() {
 
        // ------ retrieve PK constraint ------
 
-       protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData, 
String schema, String table) throws SQLException {
+       protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData 
metaData, String schema, String table) throws SQLException {
 
                // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
                // the returned primary key columns are ordered by COLUMN_NAME, 
not by KEY_SEQ.
                // We need to sort them based on the KEY_SEQ value.
                ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
 
-               List<Map.Entry<Integer, String>> columnsWithIndex = null;
+               Map<Integer, String> keySeqColumnName = new HashMap<>();
                String pkName = null;
-               while (rs.next()) {
+               while (rs.next())  {
                        String columnName = rs.getString("COLUMN_NAME");
-                       pkName = rs.getString("PK_NAME");
+                       pkName = rs.getString("PK_NAME"); // all the PK_NAME 
should be the same
                        int keySeq = rs.getInt("KEY_SEQ");
-                       if (columnsWithIndex == null) {
-                               columnsWithIndex = new ArrayList<>();
-                       }
-                       columnsWithIndex.add(new 
AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
+                       keySeqColumnName.put(keySeq - 1, columnName); // 
KEY_SEQ is 1-based index
                }
-               if (columnsWithIndex != null) {
-                       // sort columns by KEY_SEQ
-                       
columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
-                       List<String> cols = 
columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
-                       return UniqueConstraint.primaryKey(pkName, cols);
+               List<String> pkFields = Arrays.asList(new 
String[keySeqColumnName.size()]); // initialize size
+               keySeqColumnName.forEach(pkFields::set);
+               if (!pkFields.isEmpty()) {
+                       if (pkName == null) {
+                               // PK_NAME maybe null according to the javadoc,
+                               // generate an unique name for the primary key
+                               pkName = "pk_" + String.join("_", pkFields);
+                       }
+                       return Optional.of(UniqueConstraint.primaryKey(pkName, 
pkFields));
+               } else {

Review comment:
       Yes... this is a guideline to return Optional instead of null unless 
there is a performance concern. 
   
   > Always use Optional to return nullable values in the API/public methods 
except the case of a proven performance concern.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to