This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3e64a42838 [Fix][File]use common-csv to read csv file (#8919)
3e64a42838 is described below

commit 3e64a4283874aea263d2f9344715096d44fea617
Author: litiliu <38579068+liti...@users.noreply.github.com>
AuthorDate: Wed Mar 19 21:23:05 2025 +0800

    [Fix][File]use common-csv to read csv file (#8919)
    
    Co-authored-by: litiliu <liti...@cisco.com>
---
 .../file/source/reader/CsvReadStrategy.java        | 101 +++++++++++----------
 .../e2e/connector/file/local/LocalFileIT.java      |   6 ++
 .../src/test/resources/csv/break_line.csv          |   3 +
 .../resources/csv/breakline_csv_to_assert.conf     |  51 +++++++++++
 .../format/csv/CsvDeserializationSchema.java       |  15 +--
 5 files changed, 124 insertions(+), 52 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
index 5fabc89110..cc5cb8820e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/CsvReadStrategy.java
@@ -19,7 +19,6 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
@@ -39,6 +38,10 @@ import 
org.apache.seatunnel.format.csv.constant.CsvFormatConstant;
 import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
 import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
 
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
+
 import io.airlift.compress.lzo.LzopCodec;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,12 +50,13 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 @Slf4j
 public class CsvReadStrategy extends AbstractReadStrategy {
-    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private CsvDeserializationSchema deserializationSchema;
     private String fieldDelimiter = 
BaseSourceConfigOptions.FIELD_DELIMITER.defaultValue();
     private DateUtils.Formatter dateFormat = 
BaseSourceConfigOptions.DATE_FORMAT.defaultValue();
     private DateTimeUtils.Formatter datetimeFormat =
@@ -62,6 +66,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
     private CsvLineProcessor processor;
     private int[] indexes;
     private String encoding = BaseSourceConfigOptions.ENCODING.defaultValue();
+    private CatalogTable inputCatalogTable;
 
     @Override
     public void read(String path, String tableId, Collector<SeaTunnelRow> 
output)
@@ -96,51 +101,54 @@ public class CsvReadStrategy extends AbstractReadStrategy {
                 break;
         }
 
+        CSVFormat csvFormat = CSVFormat.DEFAULT;
         try (BufferedReader reader =
-                new BufferedReader(new InputStreamReader(actualInputStream, 
encoding))) {
-            reader.lines()
-                    .skip(skipHeaderNumber)
-                    .forEach(
-                            line -> {
-                                try {
-                                    SeaTunnelRow seaTunnelRow =
-                                            deserializationSchema.deserialize(
-                                                    
line.getBytes(StandardCharsets.UTF_8));
-                                    if (!readColumns.isEmpty()) {
-                                        // need column projection
-                                        Object[] fields;
-                                        if (isMergePartition) {
-                                            fields =
-                                                    new Object
-                                                            [readColumns.size()
-                                                                    + 
partitionsMap.size()];
-                                        } else {
-                                            fields = new 
Object[readColumns.size()];
-                                        }
-                                        for (int i = 0; i < indexes.length; 
i++) {
-                                            fields[i] = 
seaTunnelRow.getField(indexes[i]);
-                                        }
-                                        seaTunnelRow = new 
SeaTunnelRow(fields);
-                                    }
-                                    if (isMergePartition) {
-                                        int index = 
seaTunnelRowType.getTotalFields();
-                                        for (String value : 
partitionsMap.values()) {
-                                            seaTunnelRow.setField(index++, 
value);
-                                        }
-                                    }
-                                    seaTunnelRow.setTableId(tableId);
-                                    output.collect(seaTunnelRow);
-                                } catch (IOException e) {
-                                    String errorMsg =
-                                            String.format(
-                                                    "Deserialize this data 
[%s] failed, please check the origin data",
-                                                    line);
-                                    throw new FileConnectorException(
-                                            
FileConnectorErrorCode.DATA_DESERIALIZE_FAILED,
-                                            errorMsg,
-                                            e);
-                                }
-                            });
+                        new BufferedReader(new 
InputStreamReader(actualInputStream, encoding));
+                CSVParser csvParser = new CSVParser(reader, csvFormat); ) {
+            for (int i = 0; i < skipHeaderNumber; i++) {
+                if (reader.readLine() == null) {
+                    throw new IOException(
+                            String.format(
+                                    "File [%s] has fewer lines than expected 
to skip.",
+                                    currentFileName));
+                }
+            }
+            // read lines
+            for (CSVRecord csvRecord : csvParser) {
+                HashMap<Integer, String> fieldIdValueMap = new HashMap<>();
+                for (int i = 0; i < 
inputCatalogTable.getTableSchema().getColumns().size(); i++) {
+                    fieldIdValueMap.put(i, csvRecord.get(i));
+                }
+                SeaTunnelRow seaTunnelRow = 
deserializationSchema.getSeaTunnelRow(fieldIdValueMap);
+                if (!readColumns.isEmpty()) {
+                    // need column projection
+                    Object[] fields;
+                    if (isMergePartition) {
+                        fields = new Object[readColumns.size() + 
partitionsMap.size()];
+                    } else {
+                        fields = new Object[readColumns.size()];
+                    }
+                    for (int i = 0; i < indexes.length; i++) {
+                        fields[i] = seaTunnelRow.getField(indexes[i]);
+                    }
+                    seaTunnelRow = new SeaTunnelRow(fields);
+                }
+                if (isMergePartition) {
+                    int index = seaTunnelRowType.getTotalFields();
+                    for (String value : partitionsMap.values()) {
+                        seaTunnelRow.setField(index++, value);
+                    }
+                }
+                seaTunnelRow.setTableId(tableId);
+                output.collect(seaTunnelRow);
+            }
+        } catch (IOException e) {
+            String errorMsg =
+                    String.format(
+                            "Deserialize this file [%s] failed, please check 
the origin data",
+                            currentFileName);
+            throw new FileConnectorException(
+                    FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, errorMsg, 
e);
         }
     }
 
@@ -177,6 +185,7 @@ public class CsvReadStrategy extends AbstractReadStrategy {
     @Override
     public void setCatalogTable(CatalogTable catalogTable) {
         SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        this.inputCatalogTable = catalogTable;
         SeaTunnelRowType userDefinedRowTypeWithPartition =
                 mergePartitionTypes(fileNames.get(0), rowType);
         ReadonlyConfig readonlyConfig = 
ReadonlyConfig.fromConfig(pluginConfig);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index c31e2e8560..fc69268584 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -286,6 +286,11 @@ public class LocalFileIT extends TestSuiteBase {
                         
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
                         container);
 
+                ContainerUtil.copyFileIntoContainers(
+                        "/csv/break_line.csv",
+                        "/seatunnel/read/csv/break_line/break_line.csv",
+                        container);
+
                 ContainerUtil.copyFileIntoContainers(
                         "/text/e2e_null_format.txt",
                         "/seatunnel/read/e2e_null_format/e2e_null_format.txt",
@@ -300,6 +305,7 @@ public class LocalFileIT extends TestSuiteBase {
         TestHelper helper = new TestHelper(container);
         helper.execute("/csv/fake_to_local_csv.conf");
         helper.execute("/csv/local_csv_to_assert.conf");
+        helper.execute("/csv/breakline_csv_to_assert.conf");
         helper.execute("/excel/fake_to_local_excel.conf");
         helper.execute("/excel/local_excel_to_assert.conf");
         helper.execute("/excel/local_excel_projection_to_assert.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
new file mode 100644
index 0000000000..f534c7811c
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/break_line.csv
@@ -0,0 +1,3 @@
+20,"harry
+ potter"
+21,"tom"
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
new file mode 100644
index 0000000000..7a0c3caf65
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/csv/breakline_csv_to_assert.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/csv/break_line"
+    file_format_type = csv
+    schema = {
+      fields {
+        age = int
+        name = string
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+            {
+              rule_type = MAX_ROW
+              rule_value = 2
+            }
+             {
+                  rule_type = MIN_ROW
+                  rule_value = 2
+                }
+          ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
index 551bba12b0..df4dc05cd6 100644
--- 
a/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
+++ 
b/seatunnel-formats/seatunnel-format-csv/src/main/java/org/apache/seatunnel/format/csv/CsvDeserializationSchema.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.format.csv;
 
-import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.ArrayType;
@@ -41,6 +40,7 @@ import org.apache.commons.lang3.StringUtils;
 import lombok.NonNull;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.math.BigDecimal;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
@@ -58,7 +58,7 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Optional;
 
-public class CsvDeserializationSchema implements 
DeserializationSchema<SeaTunnelRow> {
+public class CsvDeserializationSchema implements Serializable {
     private final SeaTunnelRowType seaTunnelRowType;
     private final String[] separators;
     private final String encoding;
@@ -169,13 +169,17 @@ public class CsvDeserializationSchema implements 
DeserializationSchema<SeaTunnel
         }
     }
 
-    @Override
-    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+    protected SeaTunnelRow deserialize(byte[] message) throws IOException {
         if (message == null || message.length == 0) {
             return null;
         }
         String content = new String(message, 
EncodingUtils.tryParseCharset(encoding));
         Map<Integer, String> splitsMap = splitLineBySeaTunnelRowType(content, 
seaTunnelRowType, 0);
+        SeaTunnelRow seaTunnelRow = getSeaTunnelRow(splitsMap);
+        return seaTunnelRow;
+    }
+
+    public SeaTunnelRow getSeaTunnelRow(Map<Integer, String> splitsMap) {
         Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
         for (int i = 0; i < objects.length; i++) {
             String fieldValue = splitsMap.get(i);
@@ -201,12 +205,11 @@ public class CsvDeserializationSchema implements 
DeserializationSchema<SeaTunnel
         return seaTunnelRow;
     }
 
-    @Override
     public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
         return seaTunnelRowType;
     }
 
-    public Map<Integer, String> splitLineBySeaTunnelRowType(
+    protected Map<Integer, String> splitLineBySeaTunnelRowType(
             String line, SeaTunnelRowType seaTunnelRowType, int level) {
         String[] splits = processor.splitLine(line, separators[level]);
         LinkedHashMap<Integer, String> splitsMap = new LinkedHashMap<>();

Reply via email to