leonardBang commented on a change in pull request #12273:
URL: https://github.com/apache/flink/pull/12273#discussion_r428437365
##########
File path:
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogITCase.java
##########
@@ -69,18 +69,39 @@ public void test_fullPath() throws Exception {
}
@Test
- public void test_insert() throws Exception {
+ public void testInsert() {
TableEnvironment tEnv = getTableEnvWithPgCatalog();
- TableResult tableResult = tEnv.executeSql(String.format("insert
into %s select * from `%s`", TABLE4, TABLE1));
- // wait to finish
-
tableResult.getJobClient().get().getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
+ TableEnvUtil.execInsertSqlAndWaitResult(
+ tEnv,
+ String.format("insert into %s select * from `%s`",
TABLE4, TABLE1));
List<Row> results = Lists.newArrayList(
- tEnv.sqlQuery(String.format("select * from %s",
TABLE1)).execute().collect());
+ tEnv.sqlQuery(String.format("select * from %s",
TABLE4)).execute().collect());
assertEquals("[1]", results.toString());
}
+ @Test
+ public void testGroupByInsert() {
+ TableEnvironment tEnv = getTableEnvWithPgCatalog();
Review comment:
How about add a before method to reuse tEnv cause every test use it ?
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -126,31 +124,29 @@ public String getBaseUrl() {
// ------ retrieve PK constraint ------
- protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData,
String schema, String table) throws SQLException {
+ protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData
metaData, String schema, String table) throws SQLException {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME,
not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
- List<Map.Entry<Integer, String>> columnsWithIndex = null;
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
String pkName = null;
- while (rs.next()) {
+ while (rs.next()) {
Review comment:
ident
##########
File path:
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -126,31 +124,29 @@ public String getBaseUrl() {
// ------ retrieve PK constraint ------
- protected UniqueConstraint getPrimaryKey(DatabaseMetaData metaData,
String schema, String table) throws SQLException {
+ protected Optional<UniqueConstraint> getPrimaryKey(DatabaseMetaData
metaData, String schema, String table) throws SQLException {
// According to the Javadoc of
java.sql.DatabaseMetaData#getPrimaryKeys,
// the returned primary key columns are ordered by COLUMN_NAME,
not by KEY_SEQ.
// We need to sort them based on the KEY_SEQ value.
ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
- List<Map.Entry<Integer, String>> columnsWithIndex = null;
+ Map<Integer, String> keySeqColumnName = new HashMap<>();
String pkName = null;
- while (rs.next()) {
+ while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
- pkName = rs.getString("PK_NAME");
+ pkName = rs.getString("PK_NAME"); // all the PK_NAME
should be the same
int keySeq = rs.getInt("KEY_SEQ");
- if (columnsWithIndex == null) {
- columnsWithIndex = new ArrayList<>();
- }
- columnsWithIndex.add(new
AbstractMap.SimpleEntry<>(Integer.valueOf(keySeq), columnName));
+ keySeqColumnName.put(keySeq - 1, columnName); //
KEY_SEQ is 1-based index
}
- if (columnsWithIndex != null) {
- // sort columns by KEY_SEQ
-
columnsWithIndex.sort(Comparator.comparingInt(Map.Entry::getKey));
- List<String> cols =
columnsWithIndex.stream().map(Map.Entry::getValue).collect(Collectors.toList());
- return UniqueConstraint.primaryKey(pkName, cols);
+ List<String> pkFields = Arrays.asList(new
String[keySeqColumnName.size()]); // initialize size
+ keySeqColumnName.forEach(pkFields::set);
+ if (!pkFields.isEmpty()) {
+ // PK_NAME maybe null according to the javadoc,
generate an unique name in that case
+ pkName = pkName != null ? pkName : "pk_" +
String.join("_", pkFields);
Review comment:
```suggestion
pkName = pkName == null ? "pk_" + String.join("_",
pkFields) : pkName;
```
How about this?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]