fsk119 commented on code in PR #20493: URL: https://github.com/apache/flink/pull/20493#discussion_r940202528
########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -294,4 +408,64 @@ private static List<Type> getSupportedHiveType() { INTERVAL_YEAR_MONTH_TYPE, INTERVAL_DAY_TIME_TYPE)); } + + /** + * The column size for this type. For numeric data this is the maximum precision. For character + * data this is the length in characters. For datetime types this is the length in characters of + * the String representation (assuming the maximum allowed precision of the fractional seconds + * component). For binary data this is the length in bytes. Null is returned for for data types + * where the column size is not applicable. + */ + // TODO + private static Integer getColumnSize(Type hiveColumnType, LogicalType flinkColumnType) { + if (hiveColumnType.isNumericType()) { + // Exactly precision for DECIMAL_TYPE and maximum precision for others. + return hiveColumnType == Type.DECIMAL_TYPE + ? ((DecimalType) flinkColumnType).getPrecision() + : hiveColumnType.getMaxPrecision(); + } + switch (hiveColumnType) { + case STRING_TYPE: + case BINARY_TYPE: + return Integer.MAX_VALUE; + case CHAR_TYPE: + case VARCHAR_TYPE: + return TypeInfoUtils.getCharacterLengthForType( + getPrimitiveTypeInfo(hiveColumnType.getName())); + case DATE_TYPE: + return 10; + case TIMESTAMP_TYPE: + return 29; + // case TIMESTAMPLOCALTZ_TYPE: + // return 31; + // 还是用flinkColumnType来实现? + default: + return null; + } + } + + /** + * The number of fractional digits for this type. Null is returned for data types where this is + * not applicable. + */ + private static Integer getDecimalDigits(Type hiveColumnType, LogicalType flinkColumnType) { Review Comment: ditto ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -294,4 +408,64 @@ private static List<Type> getSupportedHiveType() { INTERVAL_YEAR_MONTH_TYPE, INTERVAL_DAY_TIME_TYPE)); } + + /** + * The column size for this type. For numeric data this is the maximum precision. For character + * data this is the length in characters. For datetime types this is the length in characters of + * the String representation (assuming the maximum allowed precision of the fractional seconds + * component). For binary data this is the length in bytes. Null is returned for for data types + * where the column size is not applicable. + */ + // TODO + private static Integer getColumnSize(Type hiveColumnType, LogicalType flinkColumnType) { Review Comment: I think it's better to use LogicalType only because the two inputs are equal. Add `@Nullable` for the return type. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -203,10 +229,98 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } + private static ResultSet executeGetColumns( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW)); + List<Object[]> rowData = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, specifiedCatalogName, schema, tableKinds), + candidates -> candidates.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + List<Column> columns = table.getResolvedSchema().getColumns(); + + Set<String> requiredColumnNames = + filter( + new HashSet<>(table.getResolvedSchema().getColumnNames()), + columnName); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (requiredColumnNames.contains(column.getName())) { + LogicalType flinkColumnType = column.getDataType().getLogicalType(); + Type hiveColumnType = + Type.getType( + HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false)); + rowData.add( + new Object[] { + specifiedCatalogName, // TABLE_CAT + tableInfo.getIdentifier().getDatabaseName(), // TABLE_SCHEMA + tableInfo.getIdentifier().getObjectName(), // TABLE_NAME + column.getName(), // COLUMN_NAME + hiveColumnType.toJavaSQLType(), // DATA_TYPE + hiveColumnType.getName(), // TYPE_NAME + getColumnSize(hiveColumnType, flinkColumnType), // COLUMN_SIZE + null, // BUFFER_LENGTH, unused + getDecimalDigits( + hiveColumnType, flinkColumnType), // DECIMAL_DIGITS + hiveColumnType.getNumPrecRadix(), // NUM_PREC_RADIX + flinkColumnType.isNullable() + ? DatabaseMetaData.columnNullable + : DatabaseMetaData.columnNoNulls, // NULLABLE + column.getComment().orElse(""), // REMARKS + null, // COLUMN_DEF + null, // SQL_DATA_TYPE + null, // SQL_DATETIME_SUB + null, // CHAR_OCTET_LENGTH + i + 1, // ORDINAL_POSITION + flinkColumnType.isNullable() ? "YES" : "NO", // IS_NULLABLE + null, // SCOPE_CATALOG + null, // SCOPE_SCHEMA + null, // SCOPE_TABLE + null, // SOURCE_DATA_TYPE + "NO", // IS_AUTO_INCREMENT + }); + } + } + } + } + return buildResultSet( + GET_COLUMNS_SCHEMA, + rowData.stream().map(OperationExecutorFactory::wrap).collect(Collectors.toList())); Review Comment: Do we need to sort the results? ########## flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java: ########## @@ -247,165 +248,195 @@ public void testGetTables() throws Exception { null, new String[] {"MANAGED_TABLE", "VIRTUAL_VIEW"}), getExpectedGetTablesOperationSchema(), + Arrays.asList( + Arrays.asList("default_catalog", "db_test1", "tbl_1", "TABLE", ""), + Arrays.asList("default_catalog", "db_test1", "tbl_2", "TABLE", ""), + Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW", ""), + Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW", ""), + Arrays.asList("default_catalog", "db_test2", "tbl_1", "TABLE", ""), + Arrays.asList("default_catalog", "db_test2", "diff_1", "TABLE", ""), + Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW", ""), + Arrays.asList("default_catalog", "db_test2", "diff_2", "VIEW", ""), + Arrays.asList("default_catalog", "db_diff", "tbl_1", "TABLE", ""), + Arrays.asList("default_catalog", "db_diff", "tbl_2", "VIEW", ""))); + } + + @Test + public void testGetTablesWithPattern() throws Exception { + runGetObjectTest( + connection -> + connection + .getMetaData() + .getTables( + "default_catalog", + "db\\_test_", + "tbl%", + new String[] {"VIRTUAL_VIEW"}), + getExpectedGetTablesOperationSchema(), + Arrays.asList( + Arrays.asList("default_catalog", "db_test1", "tbl_3", "VIEW", ""), + Arrays.asList("default_catalog", "db_test1", "tbl_4", "VIEW", ""), + Arrays.asList("default_catalog", "db_test2", "tbl_2", "VIEW", ""))); + } + + @Test + public void testGetTableTypes() throws Exception { + runGetObjectTest( + connection -> connection.getMetaData().getTableTypes(), + ResolvedSchema.of(Column.physical("TABLE_TYPE", DataTypes.STRING())), + getActualGetTableTypesOperationResults()); + } + + @Test + void testGetColumns() throws Exception { + runGetObjectTest( + connection -> connection.getMetaData().getColumns(null, null, null, null), + getExpectedGetColumnsOperationSchema(), Arrays.asList( Arrays.asList( "default_catalog", "db_test1", "tbl_1", - "TABLE", + "user", + -5, Review Comment: Don't use magic number here. Very hard for others to understand the meaning. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -127,16 +143,30 @@ public static Callable<ResultSet> createGetTableInfoExecutor() { .collect(Collectors.toList())); } + public static Callable<ResultSet> createGetColumnsExecutor( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnsName) { + return () -> + executeGetColumns( + service, sessionHandle, catalogName, schemaName, tableName, columnsName); + } + + public static Callable<ResultSet> createGetTableTypes() { + return OperationExecutorFactory::executeGetTableTypes; Review Comment: It's more clear ``` return () -> buildResultSet( GET_TABLE_TYPES_SCHEMA, Arrays.stream(TableKind.values()) .map(kind -> wrap(kind.name())) .collect(Collectors.toList())); ``` ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/GetColumnsResult.java: ########## @@ -0,0 +1,225 @@ +package org.apache.flink.table.endpoint.hive.util; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.hive.serde2.thrift.Type; + +import javax.annotation.Nullable; + +import java.sql.DatabaseMetaData; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +// todo need improvement +public class GetColumnsResult { + + public static Builder newBuilder(ObjectIdentifier objectIdentifier) { + return new Builder( + objectIdentifier.getCatalogName(), + objectIdentifier.getDatabaseName(), + objectIdentifier.getObjectName()); + } + + public static class Builder { + private final String catalogName; + private final String databaseName; + private final String tableName; + private String columnName; + private int sqlType; + private String typeName; + private Integer columnSize; + private Integer decimalDigits; + private Integer radix; + private Integer nullable; + private String remarks; + private int position; + private String isNullable; + private String isAutoIncrement; + + private Builder(String catalogName, String databaseName, String tableName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.tableName = tableName; + } + + public Builder withColumnName(String columnName) { + this.columnName = columnName; + return this; + } + + public Builder withSQLType(int sqlType) { + this.sqlType = sqlType; + return this; + } + + public Builder withTypeName(String typeName) { + this.typeName = typeName; + return this; + } + + public Builder withColumnSize(Integer columnSize) { + this.columnSize = columnSize; + return this; + } + + public Builder withDecimalDigits(Integer decimalDigits) { + this.decimalDigits = decimalDigits; + return this; + } + + public Builder withRadix(Integer radix) { + this.radix = radix; + return this; + } + + public Builder withNullable(boolean isNullable) { + this.nullable = + isNullable ? DatabaseMetaData.columnNullable : DatabaseMetaData.columnNoNulls; + return this; + } + + public Builder withRemarks(String remarks) { + this.remarks = remarks; + return this; + } + + public Builder withPosition(int position) { + this.position = position; + return this; + } + + public Builder withIsNullable(boolean isNullable) { + this.isNullable = isNullable ? "YES" : "NO"; + return this; + } + + public Builder withIsAutoIncrement(boolean isAutoIncrement) { + this.isAutoIncrement = isAutoIncrement ? "YES" : "NO"; + return this; + } + + public Builder withColumn(Column column) { + this.columnName = column.getName(); + this.remarks = column.getComment().orElse(null); + + withFlinkColumnType(column.getDataType().getLogicalType()); + withHiveColumnType( + Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false))); + + return this; + } + + public void withFlinkColumnType(LogicalType flinkColumnType) { + withColumnSize(getColumnSize(flinkColumnType)); + withDecimalDigits(getDecimalDigits(flinkColumnType)); + withNullable(flinkColumnType.isNullable()); + withIsNullable(flinkColumnType.isNullable()); + } + + public void withHiveColumnType(Type hiveColumnType) { + withSQLType(hiveColumnType.toJavaSQLType()); + withTypeName(hiveColumnType.getName()); + withRadix(hiveColumnType.getNumPrecRadix()); + } + + public List<Object> buildToTestResult() { + return Arrays.stream(build()).filter(Objects::nonNull).collect(Collectors.toList()); + } Review Comment: ? ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -203,10 +229,98 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } + private static ResultSet executeGetColumns( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW)); + List<Object[]> rowData = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, specifiedCatalogName, schema, tableKinds), + candidates -> candidates.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + List<Column> columns = table.getResolvedSchema().getColumns(); + + Set<String> requiredColumnNames = + filter( + new HashSet<>(table.getResolvedSchema().getColumnNames()), + columnName); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (requiredColumnNames.contains(column.getName())) { + LogicalType flinkColumnType = column.getDataType().getLogicalType(); + Type hiveColumnType = + Type.getType( + HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false)); + rowData.add( + new Object[] { + specifiedCatalogName, // TABLE_CAT + tableInfo.getIdentifier().getDatabaseName(), // TABLE_SCHEMA + tableInfo.getIdentifier().getObjectName(), // TABLE_NAME + column.getName(), // COLUMN_NAME + hiveColumnType.toJavaSQLType(), // DATA_TYPE + hiveColumnType.getName(), // TYPE_NAME + getColumnSize(hiveColumnType, flinkColumnType), // COLUMN_SIZE + null, // BUFFER_LENGTH, unused + getDecimalDigits( + hiveColumnType, flinkColumnType), // DECIMAL_DIGITS + hiveColumnType.getNumPrecRadix(), // NUM_PREC_RADIX + flinkColumnType.isNullable() + ? DatabaseMetaData.columnNullable + : DatabaseMetaData.columnNoNulls, // NULLABLE + column.getComment().orElse(""), // REMARKS Review Comment: I think we can return null if the comment doesn't exist. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/GetColumnsResult.java: ########## @@ -0,0 +1,225 @@ +package org.apache.flink.table.endpoint.hive.util; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.hive.serde2.thrift.Type; + +import javax.annotation.Nullable; + +import java.sql.DatabaseMetaData; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +// todo need improvement Review Comment: Add java doc and remove this. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -223,6 +261,101 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } + private static ResultSet executeGetColumns( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.values())); + List<RowData> results = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, specifiedCatalogName, schema, tableKinds), + candidates -> candidates.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + List<Column> columns = table.getResolvedSchema().getColumns(); + + Set<String> matchedColumnNames = + filter( + new HashSet<>(table.getResolvedSchema().getColumnNames()), + columnName); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (!matchedColumnNames.contains(column.getName())) { + continue; + } + results.add( + wrap( + GetColumnsResult.newBuilder(tableInfo.getIdentifier()) + .withColumn(column) + .withPosition(i + 1) + .withIsAutoIncrement(false))); + } + } + } + return buildResultSet(GET_COLUMNS_SCHEMA, results); + } + + private static ResultSet executeGetPrimaryKeys( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + List<RowData> primaryKeyRowData = new ArrayList<>(); Review Comment: nit: primaryKeyRowData -> primaryKeies ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -223,6 +261,101 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } + private static ResultSet executeGetColumns( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.values())); + List<RowData> results = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, specifiedCatalogName, schema, tableKinds), + candidates -> candidates.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + List<Column> columns = table.getResolvedSchema().getColumns(); + + Set<String> matchedColumnNames = + filter( + new HashSet<>(table.getResolvedSchema().getColumnNames()), + columnName); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (!matchedColumnNames.contains(column.getName())) { + continue; + } + results.add( + wrap( + GetColumnsResult.newBuilder(tableInfo.getIdentifier()) + .withColumn(column) + .withPosition(i + 1) + .withIsAutoIncrement(false))); Review Comment: sort no need for `withIsAutoIncrement` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ########## @@ -468,6 +470,17 @@ public void testListUserDefinedFunctions() { "table_func0")))); } + @Test + public void testGetTable() throws Exception { + SessionHandle sessionHandle = createInitializedSession(); + // test get literally table + assertThat(service.getTable(sessionHandle, ObjectIdentifier.of("cat1", "db1", "tbl1"))) + .isInstanceOf(ResolvedCatalogTable.class); + // test get view + assertThat(service.getTable(sessionHandle, ObjectIdentifier.of("cat1", "db1", "tbl3"))) + .isInstanceOf(ResolvedCatalogView.class); + } Review Comment: I think we can remove this it doesn't have much meaning. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/GetColumnsResult.java: ########## @@ -0,0 +1,225 @@ +package org.apache.flink.table.endpoint.hive.util; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.hive.serde2.thrift.Type; + +import javax.annotation.Nullable; + +import java.sql.DatabaseMetaData; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +// todo need improvement +public class GetColumnsResult { + + public static Builder newBuilder(ObjectIdentifier objectIdentifier) { + return new Builder( + objectIdentifier.getCatalogName(), + objectIdentifier.getDatabaseName(), + objectIdentifier.getObjectName()); + } + + public static class Builder { + private final String catalogName; + private final String databaseName; + private final String tableName; + private String columnName; + private int sqlType; + private String typeName; + private Integer columnSize; + private Integer decimalDigits; + private Integer radix; + private Integer nullable; + private String remarks; + private int position; + private String isNullable; + private String isAutoIncrement; + + private Builder(String catalogName, String databaseName, String tableName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.tableName = tableName; + } + + public Builder withColumnName(String columnName) { + this.columnName = columnName; + return this; + } + + public Builder withSQLType(int sqlType) { + this.sqlType = sqlType; + return this; + } + + public Builder withTypeName(String typeName) { + this.typeName = typeName; + return this; + } + + public Builder withColumnSize(Integer columnSize) { + this.columnSize = columnSize; + return this; + } + + public Builder withDecimalDigits(Integer decimalDigits) { + this.decimalDigits = decimalDigits; + return this; + } + + public Builder withRadix(Integer radix) { + this.radix = radix; + return this; + } + + public Builder withNullable(boolean isNullable) { + this.nullable = + isNullable ? DatabaseMetaData.columnNullable : DatabaseMetaData.columnNoNulls; + return this; + } + + public Builder withRemarks(String remarks) { + this.remarks = remarks; + return this; + } + + public Builder withPosition(int position) { + this.position = position; + return this; + } + + public Builder withIsNullable(boolean isNullable) { + this.isNullable = isNullable ? "YES" : "NO"; + return this; + } + + public Builder withIsAutoIncrement(boolean isAutoIncrement) { + this.isAutoIncrement = isAutoIncrement ? "YES" : "NO"; + return this; + } Review Comment: Remove this. Flink doesn't have auto increment type. ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/GetColumnsResult.java: ########## @@ -0,0 +1,225 @@ +package org.apache.flink.table.endpoint.hive.util; Review Comment: license ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -111,6 +118,27 @@ public static Callable<ResultSet> createGetTablesExecutor( service, sessionHandle, catalogName, schemaName, tableName, tableKinds); } + public static Callable<ResultSet> createGetColumnsExecutor( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnsName) { + return () -> + executeGetColumns( + service, sessionHandle, catalogName, schemaName, tableName, columnsName); + } + + public static Callable<ResultSet> createGetTableTypesExecutor() { + return () -> + buildResultSet( + GET_TABLE_TYPES_SCHEMA, + Arrays.stream(TableKind.values()) Review Comment: Please make sure it's with the order TABLE, VIEW ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/OperationExecutorFactory.java: ########## @@ -223,6 +261,101 @@ private static ResultSet executeGetTables( .collect(Collectors.toList())); } + private static ResultSet executeGetColumns( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName, + @Nullable String columnName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.values())); + List<RowData> results = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, specifiedCatalogName, schema, tableKinds), + candidates -> candidates.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + List<Column> columns = table.getResolvedSchema().getColumns(); + + Set<String> matchedColumnNames = + filter( + new HashSet<>(table.getResolvedSchema().getColumnNames()), + columnName); + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + if (!matchedColumnNames.contains(column.getName())) { + continue; + } + results.add( + wrap( + GetColumnsResult.newBuilder(tableInfo.getIdentifier()) + .withColumn(column) + .withPosition(i + 1) + .withIsAutoIncrement(false))); + } + } + } + return buildResultSet(GET_COLUMNS_SCHEMA, results); + } + + private static ResultSet executeGetPrimaryKeys( + SqlGatewayService service, + SessionHandle sessionHandle, + @Nullable String catalogName, + @Nullable String schemaName, + @Nullable String tableName) { + String specifiedCatalogName = + isNullOrEmpty(catalogName) ? service.getCurrentCatalog(sessionHandle) : catalogName; + Set<String> schemaNames = + filter(service.listDatabases(sessionHandle, specifiedCatalogName), schemaName); + List<RowData> primaryKeyRowData = new ArrayList<>(); + + for (String schema : schemaNames) { + Set<TableInfo> tableInfos = + filter( + service.listTables( + sessionHandle, + specifiedCatalogName, + schema, + new HashSet<>(Arrays.asList(TableKind.values()))), + candidate -> candidate.getIdentifier().getObjectName(), + tableName); + + for (TableInfo tableInfo : tableInfos) { + ResolvedCatalogBaseTable<?> table = + service.getTable(sessionHandle, tableInfo.getIdentifier()); + UniqueConstraint primaryKey = + table.getResolvedSchema().getPrimaryKey().orElse(null); + if (primaryKey == null) { + continue; + } + + for (int i = 0; i < primaryKey.getColumns().size(); i++) { + primaryKeyRowData.add( + wrap( + specifiedCatalogName, + tableInfo.getIdentifier().getDatabaseName(), + tableInfo.getIdentifier().getObjectName(), + primaryKey.getColumns().get(i), + i + 1, + primaryKey.getName())); Review Comment: sort by column name ########## flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/util/GetColumnsResult.java: ########## @@ -0,0 +1,225 @@ +package org.apache.flink.table.endpoint.hive.util; + +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.hive.util.HiveTypeUtil; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; + +import org.apache.hadoop.hive.serde2.thrift.Type; + +import javax.annotation.Nullable; + +import java.sql.DatabaseMetaData; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +// todo need improvement +public class GetColumnsResult { + + public static Builder newBuilder(ObjectIdentifier objectIdentifier) { + return new Builder( + objectIdentifier.getCatalogName(), + objectIdentifier.getDatabaseName(), + objectIdentifier.getObjectName()); + } + + public static class Builder { + private final String catalogName; + private final String databaseName; + private final String tableName; + private String columnName; + private int sqlType; + private String typeName; + private Integer columnSize; + private Integer decimalDigits; + private Integer radix; + private Integer nullable; + private String remarks; + private int position; + private String isNullable; + private String isAutoIncrement; + + private Builder(String catalogName, String databaseName, String tableName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.tableName = tableName; + } + + public Builder withColumnName(String columnName) { + this.columnName = columnName; + return this; + } + + public Builder withSQLType(int sqlType) { + this.sqlType = sqlType; + return this; + } + + public Builder withTypeName(String typeName) { + this.typeName = typeName; + return this; + } + + public Builder withColumnSize(Integer columnSize) { + this.columnSize = columnSize; + return this; + } + + public Builder withDecimalDigits(Integer decimalDigits) { + this.decimalDigits = decimalDigits; + return this; + } + + public Builder withRadix(Integer radix) { + this.radix = radix; + return this; + } + + public Builder withNullable(boolean isNullable) { + this.nullable = + isNullable ? DatabaseMetaData.columnNullable : DatabaseMetaData.columnNoNulls; + return this; + } + + public Builder withRemarks(String remarks) { + this.remarks = remarks; + return this; + } + + public Builder withPosition(int position) { + this.position = position; + return this; + } + + public Builder withIsNullable(boolean isNullable) { + this.isNullable = isNullable ? "YES" : "NO"; + return this; + } + + public Builder withIsAutoIncrement(boolean isAutoIncrement) { + this.isAutoIncrement = isAutoIncrement ? "YES" : "NO"; + return this; + } + + public Builder withColumn(Column column) { + this.columnName = column.getName(); + this.remarks = column.getComment().orElse(null); + + withFlinkColumnType(column.getDataType().getLogicalType()); + withHiveColumnType( + Type.getType(HiveTypeUtil.toHiveTypeInfo(column.getDataType(), false))); + + return this; + } + + public void withFlinkColumnType(LogicalType flinkColumnType) { + withColumnSize(getColumnSize(flinkColumnType)); + withDecimalDigits(getDecimalDigits(flinkColumnType)); + withNullable(flinkColumnType.isNullable()); + withIsNullable(flinkColumnType.isNullable()); + } + + public void withHiveColumnType(Type hiveColumnType) { + withSQLType(hiveColumnType.toJavaSQLType()); + withTypeName(hiveColumnType.getName()); + withRadix(hiveColumnType.getNumPrecRadix()); + } + + public List<Object> buildToTestResult() { + return Arrays.stream(build()).filter(Objects::nonNull).collect(Collectors.toList()); + } + + public Object[] build() { + return new Object[] { + catalogName, + databaseName, + tableName, + columnName, + sqlType, + typeName, + columnSize, + null, // BUFFER_LENGTH, unused + decimalDigits, + radix, + nullable, + remarks, + null, // COLUMN_DEF + null, // SQL_DATA_TYPE + null, // SQL_DATETIME_SUB + null, // CHAR_OCTET_LENGTH + position, + isNullable, + null, // SCOPE_CATALOG + null, // SCOPE_SCHEMA + null, // SCOPE_TABLE + null, // SOURCE_DATA_TYPE + isAutoIncrement + }; + } + } + + // TODO mapping + /** + * The column size for this type. For numeric data this is the maximum precision. For character + * data this is the length in characters. For datetime types this is the length in characters of + * the String representation (assuming the maximum allowed precision of the fractional seconds + * component). For binary data this is the length in bytes. Null is returned for data types + * where the column size is not applicable. + * + * <p>Currently only supports xxx. Review Comment: add more info -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org