xuefuz commented on a change in pull request #9239: [FLINK-13385]Align Hive data type mapping with FLIP-37 URL: https://github.com/apache/flink/pull/9239#discussion_r308349596
########## File path: flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveTypeUtil.java ########## @@ -85,65 +86,68 @@ public static TypeInfo toHiveTypeInfo(DataType dataType) { LogicalTypeRoot type = dataType.getLogicalType().getTypeRoot(); if (dataType instanceof AtomicDataType) { - if (type.equals(LogicalTypeRoot.BOOLEAN)) { - return TypeInfoFactory.booleanTypeInfo; - } else if (type.equals(LogicalTypeRoot.TINYINT)) { - return TypeInfoFactory.byteTypeInfo; - } else if (type.equals(LogicalTypeRoot.SMALLINT)) { - return TypeInfoFactory.shortTypeInfo; - } else if (type.equals(LogicalTypeRoot.INTEGER)) { - return TypeInfoFactory.intTypeInfo; - } else if (type.equals(LogicalTypeRoot.BIGINT)) { - return TypeInfoFactory.longTypeInfo; - } else if (type.equals(LogicalTypeRoot.FLOAT)) { - return TypeInfoFactory.floatTypeInfo; - } else if (type.equals(LogicalTypeRoot.DOUBLE)) { - return TypeInfoFactory.doubleTypeInfo; - } else if (type.equals(LogicalTypeRoot.DATE)) { - return TypeInfoFactory.dateTypeInfo; - } else if (type.equals(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) { - return TypeInfoFactory.timestampTypeInfo; - } else if (type.equals(LogicalTypeRoot.BINARY) || type.equals(LogicalTypeRoot.VARBINARY)) { - // Hive doesn't support variable-length binary string - return TypeInfoFactory.binaryTypeInfo; - } else if (type.equals(LogicalTypeRoot.CHAR)) { - CharType charType = (CharType) dataType.getLogicalType(); - - if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) { - throw new CatalogException( - String.format("HiveCatalog doesn't support char type with length of '%d'. " + - "The maximum length is %d", - charType.getLength(), HiveChar.MAX_CHAR_LENGTH)); + switch (type) { + case BOOLEAN: + return TypeInfoFactory.booleanTypeInfo; + case TINYINT: + return TypeInfoFactory.byteTypeInfo; + case SMALLINT: + return TypeInfoFactory.shortTypeInfo; + case INTEGER: + return TypeInfoFactory.intTypeInfo; + case BIGINT: + return TypeInfoFactory.longTypeInfo; + case FLOAT: + return TypeInfoFactory.floatTypeInfo; + case DOUBLE: + return TypeInfoFactory.doubleTypeInfo; + case DATE: + return TypeInfoFactory.dateTypeInfo; + case TIMESTAMP_WITHOUT_TIME_ZONE: + return TypeInfoFactory.timestampTypeInfo; + case CHAR: { + CharType charType = (CharType) dataType.getLogicalType(); + if (charType.getLength() > HiveChar.MAX_CHAR_LENGTH) { + throw new CatalogException( + String.format("HiveCatalog doesn't support char type with length of '%d'. " + + "The maximum length is %d", + charType.getLength(), HiveChar.MAX_CHAR_LENGTH)); + } + return TypeInfoFactory.getCharTypeInfo(charType.getLength()); } - - return TypeInfoFactory.getCharTypeInfo(charType.getLength()); - } else if (type.equals(LogicalTypeRoot.VARCHAR)) { - VarCharType varCharType = (VarCharType) dataType.getLogicalType(); - - // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE) - // We don't have more information in LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE) instance - // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType - if (varCharType.getLength() == Integer.MAX_VALUE) { - return TypeInfoFactory.stringTypeInfo; + case VARCHAR: { + VarCharType varCharType = (VarCharType) dataType.getLogicalType(); + // Flink's StringType is defined as VARCHAR(Integer.MAX_VALUE) + // We don't have more information in LogicalTypeRoot to distringuish StringType and a VARCHAR(Integer.MAX_VALUE) instance + // Thus always treat VARCHAR(Integer.MAX_VALUE) as StringType + if (varCharType.getLength() == Integer.MAX_VALUE) { + return TypeInfoFactory.stringTypeInfo; + } + if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) { + throw new CatalogException( + String.format("HiveCatalog doesn't support varchar type with length of '%d'. " + + "The maximum length is %d", + varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH)); + } + return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength()); } - - if (varCharType.getLength() > HiveVarchar.MAX_VARCHAR_LENGTH) { - throw new CatalogException( - String.format("HiveCatalog doesn't support varchar type with length of '%d'. " + - "The maximum length is %d", - varCharType.getLength(), HiveVarchar.MAX_VARCHAR_LENGTH)); + case DECIMAL: { + DecimalType decimalType = (DecimalType) dataType.getLogicalType(); + // Flink and Hive share the same precision and scale range + // Flink already validates the type so we don't need to validate again here + return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); } - - return TypeInfoFactory.getVarcharTypeInfo(varCharType.getLength()); - } else if (type.equals(LogicalTypeRoot.DECIMAL)) { - DecimalType decimalType = (DecimalType) dataType.getLogicalType(); - - // Flink and Hive share the same precision and scale range - // Flink already validates the type so we don't need to validate again here - return TypeInfoFactory.getDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); + case VARBINARY: { + VarBinaryType varBinaryType = (VarBinaryType) dataType.getLogicalType(); + if (varBinaryType.getLength() == VarBinaryType.MAX_LENGTH) { Review comment: Nit: maybe we can add a comment saying the type is actually DataTypes.BYTES, similar to what is done for VARCHAR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services