This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3e64a42838 [Fix][File]use common-csv to read csv file (#8919) 3e64a42838 is described below commit 3e64a4283874aea263d2f9344715096d44fea617 Author: litiliu <38579068+liti...@users.noreply.github.com> AuthorDate: Wed Mar 19 21:23:05 2025 +0800 [Fix][File]use common-csv to read csv file (#8919) Co-authored-by: litiliu <liti...@cisco.com> --- .../file/source/reader/CsvReadStrategy.java | 101 +++++++++++---------- .../e2e/connector/file/local/LocalFileIT.java | 6 ++ .../src/test/resources/csv/break_line.csv | 3 + .../resources/csv/breakline_csv_to_assert.conf | 51 +++++++++++ .../format/csv/CsvDeserializationSchema.java | 15 +-- 5 files changed, 124 insertions(+), 52 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java index 5fabc89110..cc5cb8820e 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java @@ -19,7 +19,6 @@ package org.apache.seatunnel.connectors.seatunnel.file.source.reader; import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; @@ -39,6 +38,10 @@ import org.apache.seatunnel.format.csv.constant.CsvFormatConstant; import org.apache.seatunnel.format.csv.processor.CsvLineProcessor; import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; + import io.airlift.compress.lzo.LzopCodec; import lombok.extern.slf4j.Slf4j; @@ -47,12 +50,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import java.util.Optional; @Slf4j public class CsvReadStrategy extends AbstractReadStrategy { - private DeserializationSchema<SeaTunnelRow> deserializationSchema; + private CsvDeserializationSchema deserializationSchema; private String fieldDelimiter = BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue(); private DateUtils.Formatter dateFormat = BaseSourceConfigOptions.DATE_FORMAT.defaultValue(); private DateTimeUtils.Formatter datetimeFormat = @@ -62,6 +66,7 @@ public class CsvReadStrategy extends AbstractReadStrategy { private CsvLineProcessor processor; private int[] indexes; private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue(); + private CatalogTable inputCatalogTable; @Override public void read(String path, String tableId, Collector<SeaTunnelRow> output) @@ -96,51 +101,54 @@ public class CsvReadStrategy extends AbstractReadStrategy { break; } + CSVFormat csvFormat = CSVFormat.DEFAULT; try (BufferedReader reader = - new BufferedReader(new InputStreamReader(actualInputStream, encoding))) { - reader.lines() - .skip(skipHeaderNumber) - .forEach( - line -> { - try { - SeaTunnelRow seaTunnelRow = - deserializationSchema.deserialize( - line.getBytes(StandardCharsets.UTF_8)); - if (!readColumns.isEmpty()) { - // need column projection - Object[] fields; - if (isMergePartition) { - fields = - new Object - [readColumns.size() - + partitionsMap.size()]; - } else { - fields = new Object[readColumns.size()]; - } - for (int i = 0; i < indexes.length; i++) { - fields[i] = seaTunnelRow.getField(indexes[i]); - } - seaTunnelRow = new SeaTunnelRow(fields); - } - if (isMergePartition) { - int index = seaTunnelRowType.getTotalFields(); - for (String value : partitionsMap.values()) { - seaTunnelRow.setField(index++, value); - } - } - seaTunnelRow.setTableId(tableId); - output.collect(seaTunnelRow); - } catch (IOException e) { - String errorMsg = - String.format( - "Deserialize this data [%s] failed, please check the origin data", - line); - throw new FileConnectorException( - FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, - errorMsg, - e); - } - }); + new BufferedReader(new InputStreamReader(actualInputStream, encoding)); + CSVParser csvParser = new CSVParser(reader, csvFormat); ) { + for (int i = 0; i < skipHeaderNumber; i++) { + if (reader.readLine() == null) { + throw new IOException( + String.format( + "File [%s] has fewer lines than expected to skip.", + currentFileName)); + } + } + // read lines + for (CSVRecord csvRecord : csvParser) { + HashMap<Integer, String> fieldIdValueMap = new HashMap<>(); + for (int i = 0; i < inputCatalogTable.getTableSchema().getColumns().size(); i++) { + fieldIdValueMap.put(i, csvRecord.get(i)); + } + SeaTunnelRow seaTunnelRow = deserializationSchema.getSeaTunnelRow(fieldIdValueMap); + if (!readColumns.isEmpty()) { + // need column projection + Object[] fields; + if (isMergePartition) { + fields = new Object[readColumns.size() + partitionsMap.size()]; + } else { + fields = new Object[readColumns.size()]; + } + for (int i = 0; i < indexes.length; i++) { + fields[i] = seaTunnelRow.getField(indexes[i]); + } + seaTunnelRow = new SeaTunnelRow(fields); + } + if (isMergePartition) { + int index = seaTunnelRowType.getTotalFields(); + for (String value : partitionsMap.values()) { + seaTunnelRow.setField(index++, value); + } + } + seaTunnelRow.setTableId(tableId); + output.collect(seaTunnelRow); + } + } catch (IOException e) { + String errorMsg = + String.format( + "Deserialize this file [%s] failed, please check the origin data", + currentFileName); + throw new FileConnectorException( + FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, e); } } @@ -177,6 +185,7 @@ public class CsvReadStrategy extends AbstractReadStrategy { @Override public void setCatalogTable(CatalogTable catalogTable) { SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType(); + this.inputCatalogTable = catalogTable; SeaTunnelRowType userDefinedRowTypeWithPartition = mergePartitionTypes(fileNames.get(0), rowType); ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index c31e2e8560..fc69268584 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -286,6 +286,11 @@ public class LocalFileIT extends TestSuiteBase { "/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", container); + ContainerUtil.copyFileIntoContainers( + "/csv/break_line.csv", + "/seatunnel/read/csv/break_line/break_line.csv", + container); + ContainerUtil.copyFileIntoContainers( "/text/e2e_null_format.txt", "/seatunnel/read/e2e_null_format/e2e_null_format.txt", @@ -300,6 +305,7 @@ public class LocalFileIT extends TestSuiteBase { TestHelper helper = new TestHelper(container); helper.execute("/csv/fake_to_local_csv.conf"); helper.execute("/csv/local_csv_to_assert.conf"); + helper.execute("/csv/breakline_csv_to_assert.conf"); helper.execute("/excel/fake_to_local_excel.conf"); helper.execute("/excel/local_excel_to_assert.conf"); helper.execute("/excel/local_excel_projection_to_assert.conf"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv new file mode 100644 index 0000000000..f534c7811c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv @@ -0,0 +1,3 @@ +20,"harry + potter" +21,"tom" \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf new file mode 100644 index 0000000000..7a0c3caf65 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/csv/break_line" + file_format_type = csv + schema = { + fields { + age = int + name = string + } + } + } +} + +sink { + Assert { + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 2 + } + { + rule_type = MIN_ROW + rule_value = 2 + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java index 551bba12b0..df4dc05cd6 100644 --- a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.format.csv; -import org.apache.seatunnel.api.serialization.DeserializationSchema; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.ArrayType; @@ -41,6 +40,7 @@ import org.apache.commons.lang3.StringUtils; import lombok.NonNull; import java.io.IOException; +import java.io.Serializable; import java.math.BigDecimal; import java.nio.charset.StandardCharsets; import java.time.LocalDate; @@ -58,7 +58,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; -public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnelRow> { +public class CsvDeserializationSchema implements Serializable { private final SeaTunnelRowType seaTunnelRowType; private final String[] separators; private final String encoding; @@ -169,13 +169,17 @@ public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnel } } - @Override - public SeaTunnelRow deserialize(byte[] message) throws IOException { + protected SeaTunnelRow deserialize(byte[] message) throws IOException { if (message == null || message.length == 0) { return null; } String content = new String(message, EncodingUtils.tryParseCharset(encoding)); Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType, 0); + SeaTunnelRow seaTunnelRow = getSeaTunnelRow(splitsMap); + return seaTunnelRow; + } + + public SeaTunnelRow getSeaTunnelRow(Map<Integer, String> splitsMap) { Object[] objects = new Object[seaTunnelRowType.getTotalFields()]; for (int i = 0; i < objects.length; i++) { String fieldValue = splitsMap.get(i); @@ -201,12 +205,11 @@ public class CsvDeserializationSchema implements DeserializationSchema<SeaTunnel return seaTunnelRow; } - @Override public SeaTunnelDataType<SeaTunnelRow> getProducedType() { return seaTunnelRowType; } - public Map<Integer, String> splitLineBySeaTunnelRowType( + protected Map<Integer, String> splitLineBySeaTunnelRowType( String line, SeaTunnelRowType seaTunnelRowType, int level) { String[] splits = processor.splitLine(line, separators[level]); LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();