dawidwys commented on a change in pull request #7777:  [FLINK-9964][table] Add 
a full RFC-compliant CSV table format factory
URL: https://github.com/apache/flink/pull/7777#discussion_r259415324
 
 

 ##########
 File path: 
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDeserializationSchema.java
 ##########
 @@ -209,91 +213,157 @@ public int hashCode() {
 
        // 
--------------------------------------------------------------------------------------------
 
-       private Object convert(JsonNode node, TypeInformation<?> info) {
-               if (info == Types.VOID || node.isNull()) {
-                       return null;
-               } else if (info == Types.STRING) {
-                       return node.asText();
-               } else if (info == Types.BOOLEAN) {
-                       return Boolean.valueOf(node.asText().trim());
-               } else if (info == Types.BYTE) {
-                       return Byte.valueOf(node.asText().trim());
-               } else if (info == Types.SHORT) {
-                       return Short.valueOf(node.asText().trim());
-               } else if (info == Types.INT) {
-                       return Integer.valueOf(node.asText().trim());
-               } else if (info == Types.LONG) {
-                       return Long.valueOf(node.asText().trim());
-               } else if (info == Types.FLOAT) {
-                       return Float.valueOf(node.asText().trim());
-               } else if (info == Types.DOUBLE) {
-                       return Double.valueOf(node.asText().trim());
-               } else if (info == Types.BIG_DEC) {
-                       return new BigDecimal(node.asText().trim());
-               } else if (info == Types.BIG_INT) {
-                       return new BigInteger(node.asText().trim());
-               } else if (info == Types.SQL_DATE) {
-                       return Date.valueOf(node.asText());
-               } else if (info == Types.SQL_TIME) {
-                       return Time.valueOf(node.asText());
-               } else if (info == Types.SQL_TIMESTAMP) {
-                       return Timestamp.valueOf(node.asText());
+       private interface RuntimeConverter extends Serializable {
+               Object convert(JsonNode node);
+       }
+
+       private static RuntimeConverter createRowRuntimeConverter(
+                       RowTypeInfo rowTypeInfo,
+                       boolean ignoreParseErrors,
+                       boolean isTopLevel) {
+               final int rowArity = rowTypeInfo.getArity();
+               final TypeInformation<?>[] fieldTypes = 
rowTypeInfo.getFieldTypes();
+               final String[] fieldNames = rowTypeInfo.getFieldNames();
+
+               final RuntimeConverter[] fieldConverters = new 
RuntimeConverter[fieldTypes.length];
+               for (int i = 0; i < fieldTypes.length; i++) {
+                       final TypeInformation<?> fieldType = fieldTypes[i];
+                       final RuntimeConverter fieldConverter =
+                               createNullableRuntimeConverter(fieldType, 
ignoreParseErrors);
+
+                       fieldConverters[i] = (node) -> {
+                               try {
+                                       return fieldConverter.convert(node);
+                               } catch (Throwable t) {
+                                       if (!ignoreParseErrors) {
+                                               throw t;
+                                       }
+                                       return null;
+                               }
+                       };
+               }
+
+               // Jackson only supports mapping by name in the first level
+               if (isTopLevel) {
+                       return (node) -> {
 
 Review comment:
   Could we extract common class for those branches? The only difference is how 
we extract children of `JsonNode`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to