dawidwys commented on a change in pull request #7777:  [FLINK-9964][table] Add 
a full RFC-compliant CSV table format factory
URL: https://github.com/apache/flink/pull/7777#discussion_r259412891
 
 

 ##########
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##########
 @@ -209,91 +213,157 @@ public int hashCode() {
 
        // 
--------------------------------------------------------------------------------------------
 
-       private Object convert(JsonNode node, TypeInformation<?> info) {
-               if (info == Types.VOID || node.isNull()) {
-                       return null;
-               } else if (info == Types.STRING) {
-                       return node.asText();
-               } else if (info == Types.BOOLEAN) {
-                       return Boolean.valueOf(node.asText().trim());
-               } else if (info == Types.BYTE) {
-                       return Byte.valueOf(node.asText().trim());
-               } else if (info == Types.SHORT) {
-                       return Short.valueOf(node.asText().trim());
-               } else if (info == Types.INT) {
-                       return Integer.valueOf(node.asText().trim());
-               } else if (info == Types.LONG) {
-                       return Long.valueOf(node.asText().trim());
-               } else if (info == Types.FLOAT) {
-                       return Float.valueOf(node.asText().trim());
-               } else if (info == Types.DOUBLE) {
-                       return Double.valueOf(node.asText().trim());
-               } else if (info == Types.BIG_DEC) {
-                       return new BigDecimal(node.asText().trim());
-               } else if (info == Types.BIG_INT) {
-                       return new BigInteger(node.asText().trim());
-               } else if (info == Types.SQL_DATE) {
-                       return Date.valueOf(node.asText());
-               } else if (info == Types.SQL_TIME) {
-                       return Time.valueOf(node.asText());
-               } else if (info == Types.SQL_TIMESTAMP) {
-                       return Timestamp.valueOf(node.asText());
+       private interface RuntimeConverter extends Serializable {
+               Object convert(JsonNode node);
+       }
+
+       private static RuntimeConverter createRowRuntimeConverter(
 
 Review comment:
   I would split this method into sth like:
   
        private static RuntimeConverter createRowRuntimeConverter(
                        RowTypeInfo rowTypeInfo,
                        boolean ignoreParseErrors,
                        boolean isTopLevel) {
                final TypeInformation<?>[] fieldTypes = 
rowTypeInfo.getFieldTypes();
                final String[] fieldNames = rowTypeInfo.getFieldNames();
   
                final RuntimeConverter[] fieldConverters = 
createFieldConverters(ignoreParseErrors, fieldTypes);
   
                // Jackson only supports mapping by name in the first level
                return assembleRowConverter(ignoreParseErrors, isTopLevel, 
fieldNames, fieldConverters);
        }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to