dawidwys commented on a change in pull request #7777: [FLINK-9964][table] Add a full RFC-compliant CSV table format factory URL: https://github.com/apache/flink/pull/7777#discussion_r258583254
########## File path: flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowSerializationSchema.java ########## @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.csv; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectWriter; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ContainerNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Arrays; +import java.util.Objects; + +/** + * Serialization schema that serializes an object of Flink types into a CSV bytes. + * + * <p>Serializes the input row into a {@link ObjectNode} and + * converts it into <code>byte[]</code>. + * + * <p>Result <code>byte[]</code> messages can be deserialized using {@link CsvRowDeserializationSchema}. + */ +@PublicEvolving +public final class CsvRowSerializationSchema implements SerializationSchema<Row> { + + private static final long serialVersionUID = 2098447220136965L; + + /** Type information describing the input CSV data. */ + private final TypeInformation<Row> typeInfo; + + /** CsvMapper used to write {@link JsonNode} into bytes. */ + private final CsvMapper csvMapper; + + /** Schema describing the input CSV data. */ + private CsvSchema csvSchema; + + /** Object writer used to write rows. It is configured by {@link CsvSchema}. */ + private ObjectWriter objectWriter; + + /** Reusable object node. */ + private ObjectNode root; + + /** + * Create a {@link CsvRowSerializationSchema} expecting the given {@link TypeInformation}. + * + * @param typeInfo type information used to create schem. + */ + public CsvRowSerializationSchema(TypeInformation<Row> typeInfo) { + Preconditions.checkNotNull(typeInfo, "Type information must not be null."); + this.typeInfo = typeInfo; + + if (!(typeInfo instanceof RowTypeInfo)) { + throw new IllegalArgumentException("Row type information expected."); + } + + this.csvMapper = new CsvMapper(); + this.csvSchema = CsvRowSchemaConverter.convert((RowTypeInfo) typeInfo); + this.objectWriter = csvMapper.writer(csvSchema); + } + + @Override + public byte[] serialize(Row row) { + if (root == null) { + root = csvMapper.createObjectNode(); + } + try { + convertRow(root, row, (RowTypeInfo) typeInfo); + return objectWriter.writeValueAsBytes(root); + } catch (Throwable t) { + throw new RuntimeException("Could not serialize row '" + row + "'.", t); + } + } + + public void setFieldDelimiter(char c) { + this.csvSchema = this.csvSchema.rebuild().setColumnSeparator(c).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + public void setLineDelimiter(String delimiter) { + if (!delimiter.equals("\n") && !delimiter.equals("\r") && !delimiter.equals("\r\n")) { + throw new IllegalArgumentException( + "Unsupported new line delimiter. Only \\n, \\r, or \\r\\n are supported."); + } + this.csvSchema = this.csvSchema.rebuild().setLineSeparator(delimiter).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + public void setArrayElementDelimiter(String s) { + this.csvSchema = this.csvSchema.rebuild().setArrayElementSeparator(s).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + public void setQuoteCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setQuoteChar(c).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + public void setEscapeCharacter(char c) { + this.csvSchema = this.csvSchema.rebuild().setEscapeChar(c).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + public void setNullLiteral(String s) { + this.csvSchema = this.csvSchema.rebuild().setNullValue(s).build(); + this.objectWriter = objectWriter.with(csvSchema); + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != this.getClass()) { + return false; + } + if (this == o) { + return true; + } + final CsvRowSerializationSchema that = (CsvRowSerializationSchema) o; + final CsvSchema otherSchema = that.csvSchema; + + return typeInfo.equals(that.typeInfo) && + csvSchema.getColumnSeparator() == otherSchema.getColumnSeparator() && + Arrays.equals(csvSchema.getLineSeparator(), otherSchema.getLineSeparator()) && + csvSchema.getArrayElementSeparator().equals(otherSchema.getArrayElementSeparator()) && + csvSchema.getQuoteChar() == otherSchema.getQuoteChar() && + csvSchema.getEscapeChar() == otherSchema.getEscapeChar() && + Arrays.equals(csvSchema.getNullValue(), otherSchema.getNullValue()); + } + + @Override + public int hashCode() { + return Objects.hash( + typeInfo, + csvSchema.getColumnSeparator(), + csvSchema.getLineSeparator(), + csvSchema.getArrayElementSeparator(), + csvSchema.getQuoteChar(), + csvSchema.getEscapeChar(), + csvSchema.getNullValue()); + } + + // -------------------------------------------------------------------------------------------- + + private void convertRow(ObjectNode reuse, Row row, RowTypeInfo rowTypeInfo) { + if (reuse == null) { + reuse = csvMapper.createObjectNode(); + } + final TypeInformation[] types = rowTypeInfo.getFieldTypes(); + if (row.getArity() != types.length) { + throw new RuntimeException("Row length mismatch. " + types.length + + " fields expected but was " + row.getArity() + "."); + } + + final String[] fields = rowTypeInfo.getFieldNames(); + for (int i = 0; i < types.length; i++) { + final String columnName = fields[i]; + final Object obj = row.getField(i); + reuse.set(columnName, convert(reuse, obj, types[i], false)); + } + } + + /** + * Converts an object to a JsonNode. + * + * @param container {@link ContainerNode} that creates {@link JsonNode}. + * @param obj Object to be converted to {@link JsonNode}. + * @param info Type information that decides the type of {@link JsonNode}. + * @param nested variable that indicates whether the obj is in a nested structure + * like a string in an array. + * @return result after converting. + */ + private JsonNode convert(ContainerNode<?> container, Object obj, TypeInformation<?> info, boolean nested) { Review comment: Unused parameter `nested`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services