This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 8b5967275af branch-3.1: [feat](file-foramt) unify file format 
properties for tvf and outfile #50225 #50463 #50471 (#52101)
8b5967275af is described below

commit 8b5967275af55939a72e36caef69b6b309bccfcd
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Jun 23 09:57:35 2025 +0800

    branch-3.1: [feat](file-foramt) unify file format properties for tvf and 
outfile #50225 #50463 #50471 (#52101)
    
    bp #50225 #50463 #50471
    
    ---------
    
    Co-authored-by: Tiewei Fang <[email protected]>
---
 .../org/apache/doris/analysis/OutFileClause.java   | 292 ++++++---------------
 .../java/org/apache/doris/common/util/Util.java    |   6 +-
 .../fileformat/AvroFileFormatProperties.java       |  49 ++++
 .../fileformat/CsvFileFormatProperties.java        | 192 ++++++++++++++
 .../property/fileformat/FileFormatProperties.java  | 124 +++++++++
 .../fileformat/JsonFileFormatProperties.java       | 117 +++++++++
 .../fileformat/OrcFileFormatProperties.java        |  83 ++++++
 .../fileformat/ParquetFileFormatProperties.java    | 129 +++++++++
 .../fileformat/WalFileFormatProperties.java        |  49 ++++
 .../org/apache/doris/planner/ResultFileSink.java   |  11 +-
 .../ExternalFileTableValuedFunction.java           | 159 ++---------
 .../HttpStreamTableValuedFunction.java             |   6 +-
 .../fileformat/AvroFileFormatPropertiesTest.java   |  42 +++
 .../fileformat/CsvFileFormatPropertiesTest.java    | 224 ++++++++++++++++
 .../fileformat/FileFormatPropertiesTest.java       |  34 +++
 .../fileformat/JsonFileFormatPropertiesTest.java   | 199 ++++++++++++++
 .../fileformat/OrcFileFormatPropertiesTest.java    |  97 +++++++
 .../ParquetFileFormatPropertiesTest.java           | 139 ++++++++++
 .../fileformat/WalFileFormatPropertiesTest.java    |  41 +++
 19 files changed, 1642 insertions(+), 351 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index adf601da1ea..744442955c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -30,19 +30,16 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
-import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
-import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.property.PropertyConverter;
 import org.apache.doris.datasource.property.constants.S3Properties;
-import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
 import org.apache.doris.thrift.TFileFormatType;
-import org.apache.doris.thrift.TParquetCompressionType;
 import org.apache.doris.thrift.TParquetDataType;
 import org.apache.doris.thrift.TParquetRepetitionType;
 import org.apache.doris.thrift.TParquetSchema;
-import org.apache.doris.thrift.TParquetVersion;
 import org.apache.doris.thrift.TResultFileSinkOptions;
 
 import com.google.common.base.Preconditions;
@@ -61,7 +58,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 // For syntax select * from tbl INTO OUTFILE xxxx
 public class OutFileClause {
@@ -71,9 +67,6 @@ public class OutFileClause {
     public static final List<Type> RESULT_COL_TYPES = Lists.newArrayList();
     public static final Map<String, TParquetRepetitionType> 
PARQUET_REPETITION_TYPE_MAP = Maps.newHashMap();
     public static final Map<String, TParquetDataType> PARQUET_DATA_TYPE_MAP = 
Maps.newHashMap();
-    public static final Map<String, TParquetCompressionType> 
PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
-    public static final Map<String, TFileCompressType> 
ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
-    public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = 
Maps.newHashMap();
     public static final Set<String> ORC_DATA_TYPE = Sets.newHashSet();
     public static final String FILE_NUMBER = "FileNumber";
     public static final String TOTAL_ROWS = "TotalRows";
@@ -110,24 +103,6 @@ public class OutFileClause {
         PARQUET_DATA_TYPE_MAP.put("double", TParquetDataType.DOUBLE);
         PARQUET_DATA_TYPE_MAP.put("fixed_len_byte_array", 
TParquetDataType.FIXED_LEN_BYTE_ARRAY);
 
-        PARQUET_COMPRESSION_TYPE_MAP.put("snappy", 
TParquetCompressionType.SNAPPY);
-        PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
-        PARQUET_COMPRESSION_TYPE_MAP.put("brotli", 
TParquetCompressionType.BROTLI);
-        PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
-        PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
-        // arrow do not support lzo and bz2 compression type.
-        // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", 
TParquetCompressionType.LZO);
-        // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", 
TParquetCompressionType.BZ2);
-        PARQUET_COMPRESSION_TYPE_MAP.put("plain", 
TParquetCompressionType.UNCOMPRESSED);
-
-        ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
-        ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
-        ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
-        ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
-
-        PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
-        PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
-
         ORC_DATA_TYPE.add("bigint");
         ORC_DATA_TYPE.add("boolean");
         ORC_DATA_TYPE.add("double");
@@ -152,9 +127,7 @@ public class OutFileClause {
     public static final String PROP_DELETE_EXISTING_FILES = 
"delete_existing_files";
     public static final String PROP_FILE_SUFFIX = "file_suffix";
     public static final String PROP_WITH_BOM = "with_bom";
-    public static final String COMPRESS_TYPE = "compress_type";
 
-    private static final String PARQUET_PROP_PREFIX = "parquet.";
     private static final String SCHEMA = "schema";
 
     private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 
1024; // 1GB
@@ -162,13 +135,8 @@ public class OutFileClause {
     private static final long MAX_FILE_SIZE_BYTES = 2 * 1024 * 1024 * 1024L; 
// 2GB
 
     private String filePath;
-    private String format;
     private Map<String, String> properties;
 
-    // set following members after analyzing
-    private String columnSeparator = "\t";
-    private String lineDelimiter = "\n";
-    private TFileFormatType fileFormatType;
     private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
     private boolean deleteExistingFiles = false;
     private String fileSuffix = "";
@@ -184,43 +152,41 @@ public class OutFileClause {
     private List<Pair<String, String>> orcSchemas = new ArrayList<>();
 
     private boolean isAnalyzed = false;
-    private String headerType = "";
 
-    private TParquetCompressionType parquetCompressionType = 
TParquetCompressionType.SNAPPY;
-    private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
-    private static final String PARQUET_DISABLE_DICTIONARY = 
"disable_dictionary";
-    private boolean parquetDisableDictionary = false;
-    private static final String PARQUET_VERSION = "version";
-    private static TParquetVersion parquetVersion = 
TParquetVersion.PARQUET_1_0;
+    private FileFormatProperties fileFormatProperties;
 
     public OutFileClause(String filePath, String format, Map<String, String> 
properties) {
         this.filePath = filePath;
-        this.format = Strings.isNullOrEmpty(format) ? "csv" : 
format.toLowerCase();
         this.properties = properties;
         this.isAnalyzed = false;
+        if (Strings.isNullOrEmpty(format)) {
+            fileFormatProperties = 
FileFormatProperties.createFileFormatProperties("csv");
+        } else {
+            fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(format.toLowerCase());
+        }
     }
 
     public OutFileClause(OutFileClause other) {
         this.filePath = other.filePath;
-        this.format = other.format;
+        this.fileFormatProperties = other.fileFormatProperties;
         this.properties = other.properties == null ? null : 
Maps.newHashMap(other.properties);
         this.isAnalyzed = other.isAnalyzed;
     }
 
     public String getColumnSeparator() {
-        return columnSeparator;
+        return ((CsvFileFormatProperties) 
fileFormatProperties).getColumnSeparator();
     }
 
     public String getLineDelimiter() {
-        return lineDelimiter;
+        return ((CsvFileFormatProperties) 
fileFormatProperties).getLineDelimiter();
     }
 
     public String getHeaderType() {
-        return headerType;
+        return ((CsvFileFormatProperties) 
fileFormatProperties).getHeaderType();
     }
 
     public TFileFormatType getFileFormatType() {
-        return fileFormatType;
+        return fileFormatProperties.getFileFormatType();
     }
 
     public long getMaxFileSizeBytes() {
@@ -245,28 +211,6 @@ public class OutFileClause {
         }
         analyzeFilePath();
 
-        switch (this.format) {
-            case "csv":
-                fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "parquet":
-                fileFormatType = TFileFormatType.FORMAT_PARQUET;
-                break;
-            case "orc":
-                fileFormatType = TFileFormatType.FORMAT_ORC;
-                break;
-            case "csv_with_names":
-                headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
-                fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "csv_with_names_and_types":
-                headerType = 
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
-                fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            default:
-                throw new AnalysisException("format:" + this.format + " is not 
supported.");
-        }
-
         analyzeProperties();
 
         if (brokerDesc != null && isLocalOutput) {
@@ -559,76 +503,60 @@ public class OutFileClause {
         if (properties == null || properties.isEmpty()) {
             return;
         }
+        // Copy the properties, because we will remove the key from properties.
+        Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
+        copiedProps.putAll(properties);
 
-        Set<String> processedPropKeys = Sets.newHashSet();
-        analyzeBrokerDesc(processedPropKeys);
+        analyzeBrokerDesc(copiedProps);
 
-        if (properties.containsKey(PROP_COLUMN_SEPARATOR)) {
-            if (!Util.isCsvFormat(fileFormatType)) {
-                throw new AnalysisException(PROP_COLUMN_SEPARATOR + " is only 
for CSV format");
-            }
-            columnSeparator = 
Separator.convertSeparator(properties.get(PROP_COLUMN_SEPARATOR));
-            processedPropKeys.add(PROP_COLUMN_SEPARATOR);
-        }
-
-        if (properties.containsKey(PROP_LINE_DELIMITER)) {
-            if (!Util.isCsvFormat(fileFormatType)) {
-                throw new AnalysisException(PROP_LINE_DELIMITER + " is only 
for CSV format");
-            }
-            lineDelimiter = 
Separator.convertSeparator(properties.get(PROP_LINE_DELIMITER));
-            processedPropKeys.add(PROP_LINE_DELIMITER);
-        }
+        fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
 
-        if (properties.containsKey(PROP_MAX_FILE_SIZE)) {
-            maxFileSizeBytes = 
ParseUtil.analyzeDataVolume(properties.get(PROP_MAX_FILE_SIZE));
+        if (copiedProps.containsKey(PROP_MAX_FILE_SIZE)) {
+            maxFileSizeBytes = 
ParseUtil.analyzeDataVolume(copiedProps.get(PROP_MAX_FILE_SIZE));
             if (maxFileSizeBytes > MAX_FILE_SIZE_BYTES || maxFileSizeBytes < 
MIN_FILE_SIZE_BYTES) {
                 throw new AnalysisException("max file size should between 5MB 
and 2GB. Given: " + maxFileSizeBytes);
             }
-            processedPropKeys.add(PROP_MAX_FILE_SIZE);
+            copiedProps.remove(PROP_MAX_FILE_SIZE);
         }
 
-        if (properties.containsKey(PROP_DELETE_EXISTING_FILES)) {
-            deleteExistingFiles = 
Boolean.parseBoolean(properties.get(PROP_DELETE_EXISTING_FILES))
+        if (copiedProps.containsKey(PROP_DELETE_EXISTING_FILES)) {
+            deleteExistingFiles = 
Boolean.parseBoolean(copiedProps.get(PROP_DELETE_EXISTING_FILES))
                                     & Config.enable_delete_existing_files;
-            processedPropKeys.add(PROP_DELETE_EXISTING_FILES);
+            copiedProps.remove(PROP_DELETE_EXISTING_FILES);
         }
 
-        if (properties.containsKey(PROP_FILE_SUFFIX)) {
-            fileSuffix = properties.get(PROP_FILE_SUFFIX);
-            processedPropKeys.add(PROP_FILE_SUFFIX);
+        if (copiedProps.containsKey(PROP_FILE_SUFFIX)) {
+            fileSuffix = copiedProps.get(PROP_FILE_SUFFIX);
+            copiedProps.remove(PROP_FILE_SUFFIX);
         }
 
-        if (properties.containsKey(PROP_WITH_BOM)) {
-            withBom = 
Boolean.valueOf(properties.get(PROP_WITH_BOM)).booleanValue();
-            processedPropKeys.add(PROP_WITH_BOM);
+        if (copiedProps.containsKey(PROP_WITH_BOM)) {
+            withBom = 
Boolean.valueOf(copiedProps.get(PROP_WITH_BOM)).booleanValue();
+            copiedProps.remove(PROP_WITH_BOM);
         }
 
-        if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
-            successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
+        if (copiedProps.containsKey(PROP_SUCCESS_FILE_NAME)) {
+            successFileName = copiedProps.get(PROP_SUCCESS_FILE_NAME);
             FeNameFormat.checkOutfileSuccessFileName("file name", 
successFileName);
-            processedPropKeys.add(PROP_SUCCESS_FILE_NAME);
+            copiedProps.remove(PROP_SUCCESS_FILE_NAME);
         }
 
         // For Azure compatibility, this is temporarily added to the map 
without further processing.
         // The validity of each provider's value will be checked later in 
S3Properties' check.
-        if (properties.containsKey(S3Properties.PROVIDER)) {
-            processedPropKeys.add(S3Properties.PROVIDER);
+        if (copiedProps.containsKey(S3Properties.PROVIDER)) {
+            copiedProps.remove(S3Properties.PROVIDER);
         }
 
-        if (this.fileFormatType == TFileFormatType.FORMAT_PARQUET) {
-            getParquetProperties(processedPropKeys);
+        if (fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_PARQUET) {
+            getParquetProperties(copiedProps);
         }
 
-        if (this.fileFormatType == TFileFormatType.FORMAT_ORC) {
-            getOrcProperties(processedPropKeys);
+        if (fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_ORC) {
+            getOrcProperties(copiedProps);
         }
 
-        if (processedPropKeys.size() != properties.size()) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("{} vs {}", processedPropKeys, properties);
-            }
-            throw new AnalysisException("Unknown properties: " + 
properties.keySet().stream()
-                    .filter(k -> 
!processedPropKeys.contains(k)).collect(Collectors.toList()));
+        if (!copiedProps.isEmpty()) {
+            throw new AnalysisException("Unknown properties: " + 
copiedProps.keySet());
         }
     }
 
@@ -637,11 +565,11 @@ public class OutFileClause {
      * 1. broker: with broker name
      * 2. s3: with s3 pattern path, without broker name
      */
-    private void analyzeBrokerDesc(Set<String> processedPropKeys) throws 
UserException {
-        String brokerName = properties.get(PROP_BROKER_NAME);
+    private void analyzeBrokerDesc(Map<String, String> copiedProps) throws 
UserException {
+        String brokerName = copiedProps.get(PROP_BROKER_NAME);
         StorageBackend.StorageType storageType;
-        if (properties.containsKey(PROP_BROKER_NAME)) {
-            processedPropKeys.add(PROP_BROKER_NAME);
+        if (copiedProps.containsKey(PROP_BROKER_NAME)) {
+            copiedProps.remove(PROP_BROKER_NAME);
             storageType = StorageBackend.StorageType.BROKER;
         } else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) {
             brokerName = StorageBackend.StorageType.S3.name();
@@ -654,29 +582,32 @@ public class OutFileClause {
         }
 
         Map<String, String> brokerProps = Maps.newHashMap();
-        for (Map.Entry<String, String> entry : properties.entrySet()) {
+        Iterator<Map.Entry<String, String>> iterator = 
copiedProps.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
             if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && 
!entry.getKey().equals(PROP_BROKER_NAME)) {
                 
brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), 
entry.getValue());
-                processedPropKeys.add(entry.getKey());
+                iterator.remove();
             } else if 
(entry.getKey().toLowerCase().startsWith(S3Properties.S3_PREFIX)
                     || 
entry.getKey().toUpperCase().startsWith(S3Properties.Env.PROPERTIES_PREFIX)) {
                 brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
+                iterator.remove();
             } else if (entry.getKey().contains(HdfsResource.HADOOP_FS_NAME)
                     && storageType == StorageBackend.StorageType.HDFS) {
                 brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
+                iterator.remove();
             } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX)
                     || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
                     && storageType == StorageBackend.StorageType.HDFS) {
                 brokerProps.put(entry.getKey(), entry.getValue());
-                processedPropKeys.add(entry.getKey());
+                iterator.remove();
             }
         }
+
         if (storageType == StorageBackend.StorageType.S3) {
-            if (properties.containsKey(PropertyConverter.USE_PATH_STYLE)) {
-                brokerProps.put(PropertyConverter.USE_PATH_STYLE, 
properties.get(PropertyConverter.USE_PATH_STYLE));
-                processedPropKeys.add(PropertyConverter.USE_PATH_STYLE);
+            if (copiedProps.containsKey(PropertyConverter.USE_PATH_STYLE)) {
+                brokerProps.put(PropertyConverter.USE_PATH_STYLE, 
copiedProps.get(PropertyConverter.USE_PATH_STYLE));
+                copiedProps.remove(PropertyConverter.USE_PATH_STYLE);
             }
             S3Properties.requiredS3Properties(brokerProps);
         } else if (storageType == StorageBackend.StorageType.HDFS) {
@@ -694,14 +625,6 @@ public class OutFileClause {
         return fullPath.replace(filePath, "");
     }
 
-    void setParquetVersion(String propertyValue) {
-        if (PARQUET_VERSION_MAP.containsKey(propertyValue)) {
-            this.parquetVersion = PARQUET_VERSION_MAP.get(propertyValue);
-        } else {
-            LOG.debug("not set parquet version type or is invalid, set default 
to PARQUET_1.0 version.");
-        }
-    }
-
     /**
      * example:
      * SELECT citycode FROM table1 INTO OUTFILE "file:///root/doris/"
@@ -713,36 +636,10 @@ public class OutFileClause {
      * prefix with 'parquet.' defines the properties of parquet file,
      * currently only supports: compression, disable_dictionary, version
      */
-    private void getParquetProperties(Set<String> processedPropKeys) throws 
AnalysisException {
-        // save compress type
-        if (properties.containsKey(COMPRESS_TYPE)) {
-            if 
(PARQUET_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
 {
-                this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
-                        properties.get(COMPRESS_TYPE).toLowerCase());
-                processedPropKeys.add(COMPRESS_TYPE);
-            } else {
-                throw new AnalysisException("parquet compression type [" + 
properties.get(COMPRESS_TYPE)
-                        + "] is invalid, please choose one among SNAPPY, GZIP, 
BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
-            }
-        }
-
-        // save all parquet prefix property
-        Iterator<Map.Entry<String, String>> iter = 
properties.entrySet().iterator();
-        while (iter.hasNext()) {
-            Map.Entry<String, String> entry = iter.next();
-            if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
-                processedPropKeys.add(entry.getKey());
-                if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_DISABLE_DICTIONARY))
 {
-                    this.parquetDisableDictionary = 
Boolean.valueOf(entry.getValue());
-                } else if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length()).equals(PARQUET_VERSION))
 {
-                    setParquetVersion(entry.getValue());
-                }
-            }
-        }
-
+    private void getParquetProperties(Map<String, String> copiedProps) throws 
AnalysisException {
         // check schema. if schema is not set, Doris will gen schema by select 
items
         // Note: These codes are useless and outdated.
-        String schema = properties.get(SCHEMA);
+        String schema = copiedProps.get(SCHEMA);
         if (schema == null) {
             return;
         }
@@ -753,43 +650,31 @@ public class OutFileClause {
         schema = schema.toLowerCase();
         String[] schemas = schema.split(";");
         for (String item : schemas) {
-            String[] properties = item.split(",");
-            if (properties.length != 3) {
+            String[] fields = item.split(",");
+            if (fields.length != 3) {
                 throw new AnalysisException("must only contains repetition 
type/column type/column name");
             }
-            if (!PARQUET_REPETITION_TYPE_MAP.containsKey(properties[0])) {
+            if (!PARQUET_REPETITION_TYPE_MAP.containsKey(fields[0])) {
                 throw new AnalysisException("unknown repetition type");
             }
-            if (!properties[0].equalsIgnoreCase("required")) {
+            if (!fields[0].equalsIgnoreCase("required")) {
                 throw new AnalysisException("currently only support required 
type");
             }
-            if (!PARQUET_DATA_TYPE_MAP.containsKey(properties[1])) {
-                throw new AnalysisException("data type is not supported:" + 
properties[1]);
+            if (!PARQUET_DATA_TYPE_MAP.containsKey(fields[1])) {
+                throw new AnalysisException("data type is not supported:" + 
fields[1]);
             }
             TParquetSchema parquetSchema = new TParquetSchema();
-            parquetSchema.schema_repetition_type = 
PARQUET_REPETITION_TYPE_MAP.get(properties[0]);
-            parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get(properties[1]);
-            parquetSchema.schema_column_name = properties[2];
+            parquetSchema.schema_repetition_type = 
PARQUET_REPETITION_TYPE_MAP.get(fields[0]);
+            parquetSchema.schema_data_type = 
PARQUET_DATA_TYPE_MAP.get(fields[1]);
+            parquetSchema.schema_column_name = fields[2];
             parquetSchemas.add(parquetSchema);
         }
-        processedPropKeys.add(SCHEMA);
+        copiedProps.remove(SCHEMA);
     }
 
-    private void getOrcProperties(Set<String> processedPropKeys) throws 
AnalysisException {
-        // get compression type
-        // save compress type
-        if (properties.containsKey(COMPRESS_TYPE)) {
-            if 
(ORC_COMPRESSION_TYPE_MAP.containsKey(properties.get(COMPRESS_TYPE).toLowerCase()))
 {
-                this.orcCompressionType = 
ORC_COMPRESSION_TYPE_MAP.get(properties.get(COMPRESS_TYPE).toLowerCase());
-                processedPropKeys.add(COMPRESS_TYPE);
-            } else {
-                throw new AnalysisException("orc compression type [" + 
properties.get(COMPRESS_TYPE) + "] is invalid,"
-                        + " please choose one among ZLIB, SNAPPY, ZSTD or 
PLAIN");
-            }
-        }
-
+    private void getOrcProperties(Map<String, String> copiedProps) throws 
AnalysisException {
         // check schema. if schema is not set, Doris will gen schema by select 
items
-        String schema = properties.get(SCHEMA);
+        String schema = copiedProps.get(SCHEMA);
         if (schema == null) {
             return;
         }
@@ -800,15 +685,15 @@ public class OutFileClause {
         schema = schema.toLowerCase();
         String[] schemas = schema.split(";");
         for (String item : schemas) {
-            String[] properties = item.split(",");
-            if (properties.length != 2) {
+            String[] fields = item.split(",");
+            if (fields.length != 2) {
                 throw new AnalysisException("must only contains type and 
column name");
             }
-            if (!ORC_DATA_TYPE.contains(properties[1]) && 
!properties[1].startsWith("decimal")) {
-                throw new AnalysisException("data type is not supported:" + 
properties[1]);
-            } else if (!ORC_DATA_TYPE.contains(properties[1]) && 
properties[1].startsWith("decimal")) {
+            if (!ORC_DATA_TYPE.contains(fields[1]) && 
!fields[1].startsWith("decimal")) {
+                throw new AnalysisException("data type is not supported:" + 
fields[1]);
+            } else if (!ORC_DATA_TYPE.contains(fields[1]) && 
fields[1].startsWith("decimal")) {
                 String errorMsg = "Format of decimal type must be 
decimal(%d,%d)";
-                String precisionAndScale = properties[1].substring(0, 
"decimal".length()).trim();
+                String precisionAndScale = fields[1].substring(0, 
"decimal".length()).trim();
                 if (!precisionAndScale.startsWith("(") || 
!precisionAndScale.endsWith(")")) {
                     throw new AnalysisException(errorMsg);
                 }
@@ -817,17 +702,17 @@ public class OutFileClause {
                     throw new AnalysisException(errorMsg);
                 }
             }
-            orcSchemas.add(Pair.of(properties[0], properties[1]));
+            orcSchemas.add(Pair.of(fields[0], fields[1]));
         }
-        processedPropKeys.add(SCHEMA);
+        copiedProps.remove(SCHEMA);
     }
 
     private boolean isParquetFormat() {
-        return fileFormatType == TFileFormatType.FORMAT_PARQUET;
+        return fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_PARQUET;
     }
 
     private boolean isOrcFormat() {
-        return fileFormatType == TFileFormatType.FORMAT_ORC;
+        return fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_ORC;
     }
 
     public String getFilePath() {
@@ -845,7 +730,8 @@ public class OutFileClause {
 
     public String toSql() {
         StringBuilder sb = new StringBuilder();
-        sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS 
").append(format);
+        sb.append(" INTO OUTFILE '").append(filePath).append(" FORMAT AS ")
+                .append(fileFormatProperties.getFileFormatType());
         if (properties != null && !properties.isEmpty()) {
             sb.append(" PROPERTIES(");
             sb.append(new PrintableMap<>(properties, " = ", true, false));
@@ -864,11 +750,10 @@ public class OutFileClause {
     }
 
     public TResultFileSinkOptions toSinkOptions() {
-        TResultFileSinkOptions sinkOptions = new 
TResultFileSinkOptions(filePath, fileFormatType);
-        if (Util.isCsvFormat(fileFormatType)) {
-            sinkOptions.setColumnSeparator(columnSeparator);
-            sinkOptions.setLineDelimiter(lineDelimiter);
-        }
+        TResultFileSinkOptions sinkOptions = new 
TResultFileSinkOptions(filePath,
+                fileFormatProperties.getFileFormatType());
+        fileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+
         sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
         sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
         sinkOptions.setFileSuffix(fileSuffix);
@@ -883,15 +768,10 @@ public class OutFileClause {
             sinkOptions.setSuccessFileName(successFileName);
         }
         if (isParquetFormat()) {
-            sinkOptions.setParquetCompressionType(parquetCompressionType);
-            sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
-            sinkOptions.setParquetVersion(parquetVersion);
             sinkOptions.setParquetSchemas(parquetSchemas);
         }
         if (isOrcFormat()) {
             sinkOptions.setOrcSchema(serializeOrcSchema());
-            sinkOptions.setOrcCompressionType(orcCompressionType);
-            sinkOptions.setOrcWriterVersion(1);
         }
         return sinkOptions;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index c394d9abc28..99e226f87ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -614,7 +614,11 @@ public class Util {
             return TFileCompressType.UNKNOWN;
         }
         final String upperCaseType = compressType.toUpperCase();
-        return TFileCompressType.valueOf(upperCaseType);
+        try {
+            return TFileCompressType.valueOf(upperCaseType);
+        } catch (IllegalArgumentException e) {
+            return TFileCompressType.UNKNOWN;
+        }
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
new file mode 100644
index 00000000000..6d2b799ea00
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class AvroFileFormatProperties extends FileFormatProperties {
+    public AvroFileFormatProperties() {
+        super(TFileFormatType.FORMAT_AVRO);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        return fileAttributes;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
new file mode 100644
index 00000000000..0efea98a5c3
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatProperties.java
@@ -0,0 +1,192 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.analysis.Separator;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+import org.apache.doris.thrift.TTextSerdeType;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public class CsvFileFormatProperties extends FileFormatProperties {
+    public static final Logger LOG = LogManager.getLogger(
+            
org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties.class);
+
+    public static final String DEFAULT_COLUMN_SEPARATOR = "\t";
+    public static final String DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR = "\001";
+    public static final String DEFAULT_LINE_DELIMITER = "\n";
+
+    public static final String PROP_COLUMN_SEPARATOR = "column_separator";
+    public static final String PROP_LINE_DELIMITER = "line_delimiter";
+
+    public static final String PROP_SKIP_LINES = "skip_lines";
+    public static final String PROP_CSV_SCHEMA = "csv_schema";
+    public static final String PROP_COMPRESS_TYPE = "compress_type";
+    public static final String PROP_TRIM_DOUBLE_QUOTES = "trim_double_quotes";
+
+    public static final String PROP_ENCLOSE = "enclose";
+
+    private String headerType = "";
+    private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
+    private String columnSeparator = DEFAULT_COLUMN_SEPARATOR;
+    private String lineDelimiter = DEFAULT_LINE_DELIMITER;
+    private boolean trimDoubleQuotes;
+    private int skipLines;
+    private byte enclose;
+
+    // used by tvf
+    // User specified csv columns, it will override columns got from file
+    private final List<Column> csvSchema = Lists.newArrayList();
+
+    String defaultColumnSeparator = DEFAULT_COLUMN_SEPARATOR;
+
+    public CsvFileFormatProperties() {
+        super(TFileFormatType.FORMAT_CSV_PLAIN);
+    }
+
+    public CsvFileFormatProperties(String defaultColumnSeparator, 
TTextSerdeType textSerdeType) {
+        super(TFileFormatType.FORMAT_CSV_PLAIN);
+        this.defaultColumnSeparator = defaultColumnSeparator;
+        this.textSerdeType = textSerdeType;
+    }
+
+    public CsvFileFormatProperties(String headerType) {
+        super(TFileFormatType.FORMAT_CSV_PLAIN);
+        this.headerType = headerType;
+    }
+
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        try {
+            // analyze properties specified by user
+            columnSeparator = getOrDefault(formatProperties, 
PROP_COLUMN_SEPARATOR,
+                    defaultColumnSeparator, isRemoveOriginProperty);
+            if (Strings.isNullOrEmpty(columnSeparator)) {
+                throw new AnalysisException("column_separator can not be 
empty.");
+            }
+            columnSeparator = Separator.convertSeparator(columnSeparator);
+
+            lineDelimiter = getOrDefault(formatProperties, PROP_LINE_DELIMITER,
+                    DEFAULT_LINE_DELIMITER, isRemoveOriginProperty);
+            if (Strings.isNullOrEmpty(lineDelimiter)) {
+                throw new AnalysisException("line_delimiter can not be 
empty.");
+            }
+            lineDelimiter = Separator.convertSeparator(lineDelimiter);
+
+            String enclosedString = getOrDefault(formatProperties, 
PROP_ENCLOSE,
+                    "", isRemoveOriginProperty);
+            if (!Strings.isNullOrEmpty(enclosedString)) {
+                if (enclosedString.length() > 1) {
+                    throw new AnalysisException("enclose should not be longer 
than one byte.");
+                }
+                enclose = (byte) enclosedString.charAt(0);
+                if (enclose == 0) {
+                    throw new AnalysisException("enclose should not be byte 
[0].");
+                }
+            }
+
+            trimDoubleQuotes = Boolean.valueOf(getOrDefault(formatProperties,
+                    PROP_TRIM_DOUBLE_QUOTES, "", isRemoveOriginProperty))
+                    .booleanValue();
+            skipLines = Integer.valueOf(getOrDefault(formatProperties,
+                    PROP_SKIP_LINES, "0", isRemoveOriginProperty)).intValue();
+            if (skipLines < 0) {
+                throw new AnalysisException("skipLines should not be less than 
0.");
+            }
+
+            String compressTypeStr = getOrDefault(formatProperties,
+                    PROP_COMPRESS_TYPE, "UNKNOWN", isRemoveOriginProperty);
+            compressionType = Util.getFileCompressType(compressTypeStr);
+
+        } catch (org.apache.doris.common.AnalysisException e) {
+            throw new AnalysisException(e.getMessage());
+        }
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+        sinkOptions.setColumnSeparator(columnSeparator);
+        sinkOptions.setLineDelimiter(lineDelimiter);
+    }
+
+    // The method `analyzeFileFormatProperties` must have been called once 
before this method
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
+        fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
+        if (this.enclose != 0) {
+            fileTextScanRangeParams.setEnclose(this.enclose);
+        }
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        fileAttributes.setHeaderType(headerType);
+        fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
+        fileAttributes.setSkipLines(skipLines);
+        fileAttributes.setEnableTextValidateUtf8(
+                
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
+        return fileAttributes;
+    }
+
+    public String getHeaderType() {
+        return headerType;
+    }
+
+    public TTextSerdeType getTextSerdeType() {
+        return textSerdeType;
+    }
+
+    public String getColumnSeparator() {
+        return columnSeparator;
+    }
+
+    public String getLineDelimiter() {
+        return lineDelimiter;
+    }
+
+    public boolean isTrimDoubleQuotes() {
+        return trimDoubleQuotes;
+    }
+
+    public int getSkipLines() {
+        return skipLines;
+    }
+
+    public byte getEnclose() {
+        return enclose;
+    }
+
+    public List<Column> getCsvSchema() {
+        return csvSchema;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
new file mode 100644
index 00000000000..bd0ecc214c6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/FileFormatProperties.java
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+import org.apache.doris.thrift.TTextSerdeType;
+
+import java.util.Map;
+
+public abstract class FileFormatProperties {
+    public static final String PROP_FORMAT = "format";
+    public static final String FORMAT_PARQUET = "parquet";
+    public static final String FORMAT_CSV = "csv";
+    public static final String FORMAT_CSV_WITH_NAMES = "csv_with_names";
+    public static final String FORMAT_CSV_WITH_NAMES_AND_TYPES = 
"csv_with_names_and_types";
+    public static final String FORMAT_HIVE_TEXT = "hive_text";
+    public static final String FORMAT_ORC = "orc";
+    public static final String FORMAT_JSON = "json";
+    public static final String FORMAT_AVRO = "avro";
+    public static final String FORMAT_WAL = "wal";
+    public static final String FORMAT_ARROW = "arrow";
+    public static final String PROP_COMPRESS_TYPE = "compress_type";
+
+    protected TFileFormatType fileFormatType;
+
+    protected TFileCompressType compressionType;
+
+    public FileFormatProperties(TFileFormatType fileFormatType) {
+        this.fileFormatType = fileFormatType;
+    }
+
+    /**
+     * Analyze user properties
+     * @param formatProperties properties specified by user
+     * @param isRemoveOriginProperty if this param is set to true, then this 
method would remove the origin property
+     * @throws AnalysisException
+     */
+    public abstract void analyzeFileFormatProperties(
+            Map<String, String> formatProperties, boolean 
isRemoveOriginProperty)
+            throws AnalysisException;
+
+    /**
+     * generate TResultFileSinkOptions according to the properties of 
specified file format
+     * You must call method `analyzeFileFormatProperties` once before calling 
method `toTResultFileSinkOptions`
+     */
+    public abstract void fullTResultFileSinkOptions(TResultFileSinkOptions 
sinkOptions);
+
+    /**
+     * generate TFileAttributes according to the properties of specified file 
format
+     * You must call method `analyzeFileFormatProperties` once before calling 
method `toTFileAttributes`
+     */
+    public abstract TFileAttributes toTFileAttributes();
+
+    public static FileFormatProperties createFileFormatProperties(String 
formatString) {
+        switch (formatString) {
+            case FORMAT_CSV:
+                return new CsvFileFormatProperties();
+            case FORMAT_HIVE_TEXT:
+                return new 
CsvFileFormatProperties(CsvFileFormatProperties.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR,
+                        TTextSerdeType.HIVE_TEXT_SERDE);
+            case FORMAT_CSV_WITH_NAMES:
+                return new CsvFileFormatProperties(
+                        FORMAT_CSV_WITH_NAMES);
+            case FORMAT_CSV_WITH_NAMES_AND_TYPES:
+                return new CsvFileFormatProperties(
+                        FORMAT_CSV_WITH_NAMES_AND_TYPES);
+            case FORMAT_PARQUET:
+                return new ParquetFileFormatProperties();
+            case FORMAT_ORC:
+                return new OrcFileFormatProperties();
+            case FORMAT_JSON:
+                return new JsonFileFormatProperties();
+            case FORMAT_AVRO:
+                return new AvroFileFormatProperties();
+            case FORMAT_WAL:
+                return new WalFileFormatProperties();
+            default:
+                throw new AnalysisException("format:" + formatString + " is 
not supported.");
+        }
+    }
+
+    public static FileFormatProperties createFileFormatProperties(Map<String, 
String> formatProperties)
+            throws AnalysisException {
+        String formatString = formatProperties.getOrDefault(PROP_FORMAT, "")
+                .toLowerCase();
+        return createFileFormatProperties(formatString);
+    }
+
+    protected String getOrDefault(Map<String, String> props, String key, 
String defaultValue,
+            boolean isRemove) {
+        String value = props.getOrDefault(key, defaultValue);
+        if (isRemove) {
+            props.remove(key);
+        }
+        return value;
+    }
+
+    public TFileFormatType getFileFormatType() {
+        return fileFormatType;
+    }
+
+    public TFileCompressType getCompressionType() {
+        return compressionType;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
new file mode 100644
index 00000000000..3431d366f8b
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatProperties.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.common.util.Util;
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class JsonFileFormatProperties extends FileFormatProperties {
+    public static final String PROP_JSON_ROOT = "json_root";
+    public static final String PROP_JSON_PATHS = "jsonpaths";
+    public static final String PROP_STRIP_OUTER_ARRAY = "strip_outer_array";
+    public static final String PROP_READ_JSON_BY_LINE = "read_json_by_line";
+    public static final String PROP_NUM_AS_STRING = "num_as_string";
+    public static final String PROP_FUZZY_PARSE = "fuzzy_parse";
+
+    // from ExternalFileTableValuedFunction:
+    private String jsonRoot = "";
+    private String jsonPaths = "";
+    private boolean stripOuterArray;
+    private boolean readJsonByLine;
+    private boolean numAsString;
+    private boolean fuzzyParse;
+
+
+    public JsonFileFormatProperties() {
+        super(TFileFormatType.FORMAT_JSON);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        jsonRoot = getOrDefault(formatProperties, PROP_JSON_ROOT,
+                "", isRemoveOriginProperty);
+        jsonPaths = getOrDefault(formatProperties, PROP_JSON_PATHS,
+                "", isRemoveOriginProperty);
+        readJsonByLine = Boolean.valueOf(
+                getOrDefault(formatProperties, PROP_READ_JSON_BY_LINE,
+                        "", isRemoveOriginProperty)).booleanValue();
+        stripOuterArray = Boolean.valueOf(
+                getOrDefault(formatProperties, PROP_STRIP_OUTER_ARRAY,
+                        "", isRemoveOriginProperty)).booleanValue();
+        numAsString = Boolean.valueOf(
+                getOrDefault(formatProperties, PROP_NUM_AS_STRING,
+                        "", isRemoveOriginProperty)).booleanValue();
+        fuzzyParse = Boolean.valueOf(
+                getOrDefault(formatProperties, PROP_FUZZY_PARSE,
+                        "", isRemoveOriginProperty)).booleanValue();
+
+        String compressTypeStr = getOrDefault(formatProperties, 
PROP_COMPRESS_TYPE,
+                "UNKNOWN", isRemoveOriginProperty);
+        compressionType = Util.getFileCompressType(compressTypeStr);
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        
fileTextScanRangeParams.setLineDelimiter(CsvFileFormatProperties.DEFAULT_LINE_DELIMITER);
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        fileAttributes.setJsonRoot(jsonRoot);
+        fileAttributes.setJsonpaths(jsonPaths);
+        fileAttributes.setReadJsonByLine(readJsonByLine);
+        fileAttributes.setStripOuterArray(stripOuterArray);
+        fileAttributes.setNumAsString(numAsString);
+        fileAttributes.setFuzzyParse(fuzzyParse);
+        return fileAttributes;
+    }
+
+    public String getJsonRoot() {
+        return jsonRoot;
+    }
+
+    public String getJsonPaths() {
+        return jsonPaths;
+    }
+
+    public boolean isStripOuterArray() {
+        return stripOuterArray;
+    }
+
+    public boolean isReadJsonByLine() {
+        return readJsonByLine;
+    }
+
+    public boolean isNumAsString() {
+        return numAsString;
+    }
+
+    public boolean isFuzzyParse() {
+        return fuzzyParse;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
new file mode 100644
index 00000000000..412c3d237e8
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatProperties.java
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+public class OrcFileFormatProperties extends FileFormatProperties {
+    public static final Map<String, TFileCompressType> 
ORC_COMPRESSION_TYPE_MAP = Maps.newHashMap();
+
+    static {
+        ORC_COMPRESSION_TYPE_MAP.put("plain", TFileCompressType.PLAIN);
+        ORC_COMPRESSION_TYPE_MAP.put("snappy", TFileCompressType.SNAPPYBLOCK);
+        ORC_COMPRESSION_TYPE_MAP.put("zlib", TFileCompressType.ZLIB);
+        ORC_COMPRESSION_TYPE_MAP.put("zstd", TFileCompressType.ZSTD);
+    }
+
+    private TFileCompressType orcCompressionType = TFileCompressType.ZLIB;
+
+    public OrcFileFormatProperties() {
+        super(TFileFormatType.FORMAT_ORC);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        // get compression type
+        // save compress type
+        if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) {
+            if (ORC_COMPRESSION_TYPE_MAP.containsKey(
+                    formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase())) {
+                this.orcCompressionType = ORC_COMPRESSION_TYPE_MAP.get(
+                        
formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase());
+                formatProperties.remove(PROP_COMPRESS_TYPE);
+            } else {
+                throw new AnalysisException("orc compression type ["
+                        + formatProperties.get(PROP_COMPRESS_TYPE) + "] is 
invalid,"
+                        + " please choose one among ZLIB, SNAPPY, ZSTD or 
PLAIN");
+            }
+        }
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+        sinkOptions.setOrcCompressionType(orcCompressionType);
+        sinkOptions.setOrcWriterVersion(1);
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        return fileAttributes;
+    }
+
+    public TFileCompressType getOrcCompressionType() {
+        return orcCompressionType;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
new file mode 100644
index 00000000000..8063b25964a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatProperties.java
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TParquetCompressionType;
+import org.apache.doris.thrift.TParquetVersion;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import com.google.common.collect.Maps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class ParquetFileFormatProperties extends FileFormatProperties {
+    public static final String PARQUET_DISABLE_DICTIONARY = 
"disable_dictionary";
+    public static final String PARQUET_VERSION = "version";
+    public static final String PARQUET_PROP_PREFIX = "parquet.";
+
+    public static final Logger LOG = 
LogManager.getLogger(ParquetFileFormatProperties.class);
+    public static final Map<String, TParquetCompressionType> 
PARQUET_COMPRESSION_TYPE_MAP = Maps.newHashMap();
+    public static final Map<String, TParquetVersion> PARQUET_VERSION_MAP = 
Maps.newHashMap();
+
+    static {
+        PARQUET_COMPRESSION_TYPE_MAP.put("snappy", 
TParquetCompressionType.SNAPPY);
+        PARQUET_COMPRESSION_TYPE_MAP.put("gzip", TParquetCompressionType.GZIP);
+        PARQUET_COMPRESSION_TYPE_MAP.put("brotli", 
TParquetCompressionType.BROTLI);
+        PARQUET_COMPRESSION_TYPE_MAP.put("zstd", TParquetCompressionType.ZSTD);
+        PARQUET_COMPRESSION_TYPE_MAP.put("lz4", TParquetCompressionType.LZ4);
+        // arrow do not support lzo and bz2 compression type.
+        // PARQUET_COMPRESSION_TYPE_MAP.put("lzo", 
TParquetCompressionType.LZO);
+        // PARQUET_COMPRESSION_TYPE_MAP.put("bz2", 
TParquetCompressionType.BZ2);
+        PARQUET_COMPRESSION_TYPE_MAP.put("plain", 
TParquetCompressionType.UNCOMPRESSED);
+
+        PARQUET_VERSION_MAP.put("v1", TParquetVersion.PARQUET_1_0);
+        PARQUET_VERSION_MAP.put("latest", TParquetVersion.PARQUET_2_LATEST);
+    }
+
+    private TParquetCompressionType parquetCompressionType = 
TParquetCompressionType.SNAPPY;
+    private boolean parquetDisableDictionary = false;
+    private TParquetVersion parquetVersion = TParquetVersion.PARQUET_1_0;
+
+    public ParquetFileFormatProperties() {
+        super(TFileFormatType.FORMAT_PARQUET);
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+        // save compress type
+        if (formatProperties.containsKey(PROP_COMPRESS_TYPE)) {
+            if 
(PARQUET_COMPRESSION_TYPE_MAP.containsKey(formatProperties.get(PROP_COMPRESS_TYPE)
+                    .toLowerCase())) {
+                this.parquetCompressionType = PARQUET_COMPRESSION_TYPE_MAP.get(
+                        
formatProperties.get(PROP_COMPRESS_TYPE).toLowerCase());
+                formatProperties.remove(PROP_COMPRESS_TYPE);
+            } else {
+                throw new AnalysisException("parquet compression type ["
+                        + formatProperties.get(PROP_COMPRESS_TYPE)
+                        + "] is invalid, please choose one among SNAPPY, GZIP, 
BROTLI, ZSTD, LZ4, LZO, BZ2 or PLAIN");
+            }
+        }
+
+        // save all parquet prefix property
+        Iterator<Entry<String, String>> iter = 
formatProperties.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, String> entry = iter.next();
+            if (entry.getKey().startsWith(PARQUET_PROP_PREFIX)) {
+                iter.remove();
+                if (entry.getKey().substring(PARQUET_PROP_PREFIX.length())
+                        .equals(PARQUET_DISABLE_DICTIONARY)) {
+                    this.parquetDisableDictionary = 
Boolean.valueOf(entry.getValue());
+                } else if 
(entry.getKey().substring(PARQUET_PROP_PREFIX.length())
+                        .equals(PARQUET_VERSION)) {
+                    if (PARQUET_VERSION_MAP.containsKey(entry.getValue())) {
+                        this.parquetVersion = 
PARQUET_VERSION_MAP.get(entry.getValue());
+                    } else {
+                        LOG.debug("not set parquet version type or is invalid, 
set default to PARQUET_1.0 version.");
+                    }
+                }
+            }
+        }
+    }
+
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+        sinkOptions.setParquetCompressionType(parquetCompressionType);
+        sinkOptions.setParquetDisableDictionary(parquetDisableDictionary);
+        sinkOptions.setParquetVersion(parquetVersion);
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        return fileAttributes;
+    }
+
+    public TParquetCompressionType getParquetCompressionType() {
+        return parquetCompressionType;
+    }
+
+    public boolean isParquetDisableDictionary() {
+        return parquetDisableDictionary;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
new file mode 100644
index 00000000000..0c6b1777cf6
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/property/fileformat/WalFileFormatProperties.java
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileTextScanRangeParams;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import java.util.Map;
+
+public class WalFileFormatProperties extends FileFormatProperties {
+    public WalFileFormatProperties() {
+        super(TFileFormatType.FORMAT_WAL);
+    }
+
+    @Override
+    public void fullTResultFileSinkOptions(TResultFileSinkOptions sinkOptions) 
{
+    }
+
+    @Override
+    public TFileAttributes toTFileAttributes() {
+        TFileAttributes fileAttributes = new TFileAttributes();
+        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
+        fileAttributes.setTextParams(fileTextScanRangeParams);
+        return fileAttributes;
+    }
+
+    @Override
+    public void analyzeFileFormatProperties(Map<String, String> 
formatProperties, boolean isRemoveOriginProperty)
+            throws AnalysisException {
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index 3dbd5bc2115..2f8f9ae8ab1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -25,6 +25,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.common.util.FileFormatConstants;
+import org.apache.doris.common.util.Util;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -69,11 +70,13 @@ public class ResultFileSink extends DataSink {
 
     public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, 
ArrayList<String> labels) {
         this(exchNodeId, outFileClause);
-        if 
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
-                || 
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
 {
-            header = genNames(labels, outFileClause.getColumnSeparator(), 
outFileClause.getLineDelimiter());
+        if (Util.isCsvFormat(outFileClause.getFileFormatType())) {
+            if 
(outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
+                    || 
outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES))
 {
+                header = genNames(labels, outFileClause.getColumnSeparator(), 
outFileClause.getLineDelimiter());
+            }
+            headerType = outFileClause.getHeaderType();
         }
-        headerType = outFileClause.getHeaderType();
     }
 
     public String getBrokerName() {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index 5a15ccd21f5..9ab6d302c3e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -18,7 +18,6 @@
 package org.apache.doris.tablefunction;
 
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.Separator;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.ArrayType;
 import org.apache.doris.catalog.Column;
@@ -41,6 +40,8 @@ import org.apache.doris.common.util.FileFormatConstants;
 import org.apache.doris.common.util.FileFormatUtils;
 import org.apache.doris.common.util.NetUtils;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
+import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
 import org.apache.doris.datasource.tvf.source.TVFScanNode;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.planner.PlanNodeId;
@@ -63,15 +64,12 @@ import org.apache.doris.thrift.TFileFormatType;
 import org.apache.doris.thrift.TFileRangeDesc;
 import org.apache.doris.thrift.TFileScanRange;
 import org.apache.doris.thrift.TFileScanRangeParams;
-import org.apache.doris.thrift.TFileTextScanRangeParams;
 import org.apache.doris.thrift.TFileType;
 import org.apache.doris.thrift.THdfsParams;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPrimitiveType;
 import org.apache.doris.thrift.TStatusCode;
-import org.apache.doris.thrift.TTextSerdeType;
 
-import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -110,25 +108,9 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     protected Map<String, String> locationProperties = Maps.newHashMap();
     protected String filePath;
 
-    protected TFileFormatType fileFormatType;
-
     protected Optional<String> resourceName = Optional.empty();
 
-    private TFileCompressType compressionType;
-    private String headerType = "";
-
-    private TTextSerdeType textSerdeType = TTextSerdeType.JSON_TEXT_SERDE;
-    private String columnSeparator = 
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
-    private String lineDelimiter = FileFormatConstants.DEFAULT_LINE_DELIMITER;
-    private byte enclose = 0;
-    private String jsonRoot = "";
-    private String jsonPaths = "";
-    private boolean stripOuterArray;
-    private boolean readJsonByLine;
-    private boolean numAsString;
-    private boolean fuzzyParse;
-    private boolean trimDoubleQuotes;
-    private int skipLines;
+    public FileFormatProperties fileFormatProperties;
     private long tableId;
 
     public abstract TFileType getTFileType();
@@ -138,11 +120,11 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     public abstract BrokerDesc getBrokerDesc();
 
     public TFileFormatType getTFileFormatType() {
-        return fileFormatType;
+        return fileFormatProperties.getFileFormatType();
     }
 
     public TFileCompressType getTFileCompressType() {
-        return compressionType;
+        return fileFormatProperties.getCompressionType();
     }
 
     public Map<String, String> getLocationProperties() {
@@ -179,94 +161,15 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         Map<String, String> copiedProps = 
Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
         copiedProps.putAll(mergedProperties);
 
-        String formatString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FORMAT, "").toLowerCase();
-        String defaultColumnSeparator = 
FileFormatConstants.DEFAULT_COLUMN_SEPARATOR;
-        switch (formatString) {
-            case "csv":
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "hive_text":
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                defaultColumnSeparator = 
FileFormatConstants.DEFAULT_HIVE_TEXT_COLUMN_SEPARATOR;
-                this.textSerdeType = TTextSerdeType.HIVE_TEXT_SERDE;
-                break;
-            case "csv_with_names":
-                this.headerType = FileFormatConstants.FORMAT_CSV_WITH_NAMES;
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "csv_with_names_and_types":
-                this.headerType = 
FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES;
-                this.fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
-                break;
-            case "parquet":
-                this.fileFormatType = TFileFormatType.FORMAT_PARQUET;
-                break;
-            case "orc":
-                this.fileFormatType = TFileFormatType.FORMAT_ORC;
-                break;
-            case "json":
-                this.fileFormatType = TFileFormatType.FORMAT_JSON;
-                break;
-            case "avro":
-                this.fileFormatType = TFileFormatType.FORMAT_AVRO;
-                break;
-            case "wal":
-                this.fileFormatType = TFileFormatType.FORMAT_WAL;
-                break;
-            default:
-                throw new AnalysisException("format:" + formatString + " is 
not supported.");
-        }
-
         tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps, 
PROP_TABLE_ID, "-1"));
-        columnSeparator = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_COLUMN_SEPARATOR,
-                defaultColumnSeparator);
-        if (Strings.isNullOrEmpty(columnSeparator)) {
-            throw new AnalysisException("column_separator can not be empty.");
-        }
-        columnSeparator = Separator.convertSeparator(columnSeparator);
-
-        lineDelimiter = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_LINE_DELIMITER,
-                FileFormatConstants.DEFAULT_LINE_DELIMITER);
-        if (Strings.isNullOrEmpty(lineDelimiter)) {
-            throw new AnalysisException("line_delimiter can not be empty.");
-        }
-        lineDelimiter = Separator.convertSeparator(lineDelimiter);
 
-        String enclosedString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_ENCLOSE, "");
-        if (!Strings.isNullOrEmpty(enclosedString)) {
-            if (enclosedString.length() > 1) {
-                throw new AnalysisException("enclose should not be longer than 
one byte.");
-            }
-            enclose = (byte) enclosedString.charAt(0);
-            if (enclose == 0) {
-                throw new AnalysisException("enclose should not be byte [0].");
-            }
-        }
+        String formatString = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FORMAT, "").toLowerCase();
+        fileFormatProperties = 
FileFormatProperties.createFileFormatProperties(formatString);
+        fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
 
-        jsonRoot = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_JSON_ROOT, "");
-        jsonPaths = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_JSON_PATHS, "");
-        readJsonByLine = Boolean.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_READ_JSON_BY_LINE, "")).booleanValue();
-        stripOuterArray = Boolean.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_STRIP_OUTER_ARRAY, "")).booleanValue();
-        numAsString = Boolean.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_NUM_AS_STRING, "")).booleanValue();
-        fuzzyParse = Boolean.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_FUZZY_PARSE, "")).booleanValue();
-        trimDoubleQuotes = Boolean.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_TRIM_DOUBLE_QUOTES, "")).booleanValue();
-        skipLines = Integer.valueOf(
-                getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_SKIP_LINES, "0")).intValue();
-
-        String compressTypeStr = getOrDefaultAndRemove(copiedProps, 
FileFormatConstants.PROP_COMPRESS_TYPE, "UNKNOWN");
-        try {
-            compressionType = Util.getFileCompressType(compressTypeStr);
-        } catch (IllegalArgumentException e) {
-            throw new AnalysisException("Compress type : " +  compressTypeStr 
+ " is not supported.");
-        }
-        if (FileFormatUtils.isCsv(formatString)) {
+        if (fileFormatProperties instanceof CsvFileFormatProperties) {
             FileFormatUtils.parseCsvSchema(csvSchema, 
getOrDefaultAndRemove(copiedProps,
-                    FileFormatConstants.PROP_CSV_SCHEMA, ""));
+                    CsvFileFormatProperties.PROP_CSV_SCHEMA, ""));
             if (LOG.isDebugEnabled()) {
                 LOG.debug("get csv schema: {}", csvSchema);
             }
@@ -293,29 +196,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     }
 
     public TFileAttributes getFileAttributes() {
-        TFileAttributes fileAttributes = new TFileAttributes();
-        TFileTextScanRangeParams fileTextScanRangeParams = new 
TFileTextScanRangeParams();
-        fileTextScanRangeParams.setColumnSeparator(this.columnSeparator);
-        fileTextScanRangeParams.setLineDelimiter(this.lineDelimiter);
-        if (enclose != 0) {
-            fileTextScanRangeParams.setEnclose(enclose);
-        }
-        fileAttributes.setTextParams(fileTextScanRangeParams);
-        if (this.fileFormatType == TFileFormatType.FORMAT_CSV_PLAIN) {
-            fileAttributes.setHeaderType(this.headerType);
-            fileAttributes.setTrimDoubleQuotes(trimDoubleQuotes);
-            fileAttributes.setSkipLines(skipLines);
-            fileAttributes.setEnableTextValidateUtf8(
-                    
ConnectContext.get().getSessionVariable().enableTextValidateUtf8);
-        } else if (this.fileFormatType == TFileFormatType.FORMAT_JSON) {
-            fileAttributes.setJsonRoot(jsonRoot);
-            fileAttributes.setJsonpaths(jsonPaths);
-            fileAttributes.setReadJsonByLine(readJsonByLine);
-            fileAttributes.setStripOuterArray(stripOuterArray);
-            fileAttributes.setNumAsString(numAsString);
-            fileAttributes.setFuzzyParse(fuzzyParse);
-        }
-        return fileAttributes;
+        return fileFormatProperties.toTFileAttributes();
     }
 
     @Override
@@ -345,7 +226,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
             throw new AnalysisException("No Alive backends");
         }
 
-        if (this.fileFormatType == TFileFormatType.FORMAT_WAL) {
+        if (fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_WAL) {
             List<Column> fileColumns = new ArrayList<>();
             Table table = 
Env.getCurrentInternalCatalog().getTableByTableId(tableId);
             List<Column> tableColumns = table.getBaseSchema(true);
@@ -452,7 +333,7 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
                 Pair<Type, Integer> fieldType = getColumnType(typeNodes, start 
+ parsedNodes);
                 PStructField structField = 
typeNodes.get(start).getStructFields(i);
                 fields.add(new StructField(structField.getName(), 
fieldType.key(), structField.getComment(),
-                                            structField.getContainsNull()));
+                        structField.getContainsNull()));
                 parsedNodes += fieldType.value();
             }
             type = new StructType(fields);
@@ -488,9 +369,11 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
     private PFetchTableSchemaRequest getFetchTableStructureRequest() throws 
TException {
         // set TFileScanRangeParams
         TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
-        fileScanRangeParams.setFormatType(fileFormatType);
+        
fileScanRangeParams.setFormatType(fileFormatProperties.getFileFormatType());
         fileScanRangeParams.setProperties(locationProperties);
-        fileScanRangeParams.setTextSerdeType(textSerdeType);
+        if (fileFormatProperties instanceof CsvFileFormatProperties) {
+            fileScanRangeParams.setTextSerdeType(((CsvFileFormatProperties) 
fileFormatProperties).getTextSerdeType());
+        }
         fileScanRangeParams.setFileAttributes(getFileAttributes());
         ConnectContext ctx = ConnectContext.get();
         fileScanRangeParams.setLoadId(ctx.queryId());
@@ -529,7 +412,8 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
         fileRangeDesc.setLoadId(ctx.queryId());
         fileRangeDesc.setFileType(getTFileType());
-        
fileRangeDesc.setCompressType(Util.getOrInferCompressType(compressionType, 
firstFile.getPath()));
+        fileRangeDesc.setCompressType(Util.getOrInferCompressType(
+                fileFormatProperties.getCompressionType(), 
firstFile.getPath()));
         fileRangeDesc.setPath(firstFile.getPath());
         fileRangeDesc.setStartOffset(0);
         fileRangeDesc.setSize(firstFile.getSize());
@@ -547,9 +431,10 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         if (fileStatus.isIsDir() || fileStatus.size == 0) {
             return true;
         }
-        if (Util.isCsvFormat(fileFormatType) || fileFormatType == 
TFileFormatType.FORMAT_JSON) {
+        if (Util.isCsvFormat(fileFormatProperties.getFileFormatType())
+                || fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_JSON) {
             int magicNumberBytes = 0;
-            switch (compressionType) {
+            switch (fileFormatProperties.getCompressionType()) {
                 case GZ:
                     magicNumberBytes = 20;
                     break;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
index 5044f045c31..72573a2355f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HttpStreamTableValuedFunction.java
@@ -40,9 +40,9 @@ public class HttpStreamTableValuedFunction extends 
ExternalFileTableValuedFuncti
         // 1. analyze common properties
         super.parseCommonProperties(properties);
 
-        if (fileFormatType == TFileFormatType.FORMAT_PARQUET
-                || fileFormatType == TFileFormatType.FORMAT_AVRO
-                || fileFormatType == TFileFormatType.FORMAT_ORC) {
+        if (fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_PARQUET
+                || fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_AVRO
+                || fileFormatProperties.getFileFormatType() == 
TFileFormatType.FORMAT_ORC) {
             throw new AnalysisException("http_stream does not yet support 
parquet, avro and orc");
         }
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..a7fc534e0de
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/AvroFileFormatPropertiesTest.java
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AvroFileFormatPropertiesTest {
+
+    private AvroFileFormatProperties avroFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        avroFileFormatProperties = new AvroFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatProperties() {
+        Map<String, String> properties = new HashMap<>();
+        // Add properties if needed
+        avroFileFormatProperties.analyzeFileFormatProperties(properties, true);
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..a496378b5e5
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/CsvFileFormatPropertiesTest.java
@@ -0,0 +1,224 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileCompressType;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class CsvFileFormatPropertiesTest {
+
+    private CsvFileFormatProperties csvFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        csvFileFormatProperties = new CsvFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValid() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ",");
+        properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\n");
+        properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+        Assert.assertEquals(",", csvFileFormatProperties.getColumnSeparator());
+        Assert.assertEquals("\n", csvFileFormatProperties.getLineDelimiter());
+        Assert.assertEquals(1, csvFileFormatProperties.getSkipLines());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidSeparator() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, "");
+
+        Assert.assertThrows(AnalysisException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidLineDelimiter() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "");
+
+        Assert.assertThrows(AnalysisException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidEnclose() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "invalid");
+
+        Assert.assertThrows(AnalysisException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidEnclose() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\"");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals((byte) '"', csvFileFormatProperties.getEnclose());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesSkipLinesNegative() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "-1");
+
+        Assert.assertThrows(AnalysisException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesSkipLinesLargeValue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "1000");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(1000, csvFileFormatProperties.getSkipLines());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesTrue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, 
"true");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, 
csvFileFormatProperties.isTrimDoubleQuotes());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesTrimDoubleQuotesFalse() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, 
"false");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, 
csvFileFormatProperties.isTrimDoubleQuotes());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidCompressType() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "invalid");
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.UNKNOWN, 
csvFileFormatProperties.getCompressionType());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidCompressType() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_COMPRESS_TYPE, "gz");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.GZ, 
csvFileFormatProperties.getCompressionType());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesEmptyCsvSchema() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_CSV_SCHEMA, "");
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+    }
+
+    @Test
+    public void 
testAnalyzeFileFormatPropertiesValidEncloseMultipleCharacters() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "\"\"");
+
+        Assert.assertThrows(AnalysisException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidEncloseEmpty() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_ENCLOSE, "");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(0, csvFileFormatProperties.getEnclose());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesSkipLinesAsString() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_SKIP_LINES, "abc");
+
+        Assert.assertThrows(NumberFormatException.class, () -> {
+            csvFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        });
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidColumnSeparator() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_COLUMN_SEPARATOR, ";");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(";", csvFileFormatProperties.getColumnSeparator());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesLineDelimiterAsString() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "abc");
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidLineDelimiter() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_LINE_DELIMITER, "\r\n");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("\r\n", 
csvFileFormatProperties.getLineDelimiter());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidTrimDoubleQuotes() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, 
"true");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, 
csvFileFormatProperties.isTrimDoubleQuotes());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidTrimDoubleQuotes() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(CsvFileFormatProperties.PROP_TRIM_DOUBLE_QUOTES, 
"invalid");
+
+        csvFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, 
csvFileFormatProperties.isTrimDoubleQuotes());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
new file mode 100644
index 00000000000..74d8d0db2ad
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/FileFormatPropertiesTest.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class FileFormatPropertiesTest {
+
+    @Test
+    public void testCreateFileFormatPropertiesInvalidFormat() {
+        Assert.assertThrows(AnalysisException.class, () -> {
+            FileFormatProperties.createFileFormatProperties("invalid_format");
+        });
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..f614d322386
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/JsonFileFormatPropertiesTest.java
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class JsonFileFormatPropertiesTest {
+
+    private JsonFileFormatProperties jsonFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        jsonFileFormatProperties = new JsonFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesEmpty() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+        Assert.assertEquals("", jsonFileFormatProperties.getJsonRoot());
+        Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths());
+        Assert.assertEquals(false, 
jsonFileFormatProperties.isStripOuterArray());
+        Assert.assertEquals(false, 
jsonFileFormatProperties.isReadJsonByLine());
+        Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
+        Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidJsonRoot() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, "data.items");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("data.items", 
jsonFileFormatProperties.getJsonRoot());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesValidJsonPaths() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
+                "[\"$.name\", \"$.age\", \"$.city\"]");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("[\"$.name\", \"$.age\", \"$.city\"]", 
jsonFileFormatProperties.getJsonPaths());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesStripOuterArrayTrue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, 
"true");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, 
jsonFileFormatProperties.isStripOuterArray());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesStripOuterArrayFalse() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, 
"false");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, 
jsonFileFormatProperties.isStripOuterArray());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesReadJsonByLineTrue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, 
"true");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesReadJsonByLineFalse() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, 
"false");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, 
jsonFileFormatProperties.isReadJsonByLine());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesNumAsStringTrue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesNumAsStringFalse() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "false");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, jsonFileFormatProperties.isNumAsString());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesFuzzyParseTrue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesFuzzyParseFalse() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "false");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesInvalidBooleanValue() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "invalid");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(false, jsonFileFormatProperties.isFuzzyParse());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesAllProperties() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, 
"data.records");
+        properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, "[\"$.id\", 
\"$.name\"]");
+        properties.put(JsonFileFormatProperties.PROP_STRIP_OUTER_ARRAY, 
"true");
+        properties.put(JsonFileFormatProperties.PROP_READ_JSON_BY_LINE, 
"true");
+        properties.put(JsonFileFormatProperties.PROP_NUM_AS_STRING, "true");
+        properties.put(JsonFileFormatProperties.PROP_FUZZY_PARSE, "true");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+
+        Assert.assertEquals("data.records", 
jsonFileFormatProperties.getJsonRoot());
+        Assert.assertEquals("[\"$.id\", \"$.name\"]", 
jsonFileFormatProperties.getJsonPaths());
+        Assert.assertEquals(true, 
jsonFileFormatProperties.isStripOuterArray());
+        Assert.assertEquals(true, jsonFileFormatProperties.isReadJsonByLine());
+        Assert.assertEquals(true, jsonFileFormatProperties.isNumAsString());
+        Assert.assertEquals(true, jsonFileFormatProperties.isFuzzyParse());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesSpecialCharactersInJsonRoot() 
throws AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_ROOT, 
"data.special@#$%^&*()");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("data.special@#$%^&*()", 
jsonFileFormatProperties.getJsonRoot());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesComplexJsonPaths() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_PATHS,
+                "[\"$.deeply.nested[0].array[*].field\", 
\"$.complex.path[?(@.type=='value')]\"]");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("[\"$.deeply.nested[0].array[*].field\", 
\"$.complex.path[?(@.type=='value')]\"]",
+                jsonFileFormatProperties.getJsonPaths());
+    }
+
+    @Test
+    public void testAnalyzeFileFormatPropertiesEmptyJsonPaths() throws 
AnalysisException {
+        Map<String, String> properties = new HashMap<>();
+        properties.put(JsonFileFormatProperties.PROP_JSON_PATHS, "");
+
+        jsonFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals("", jsonFileFormatProperties.getJsonPaths());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..0a63d0cec69
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/OrcFileFormatPropertiesTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TFileCompressType;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class OrcFileFormatPropertiesTest {
+
+    private OrcFileFormatProperties orcFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        orcFileFormatProperties = new OrcFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatProperties() {
+        Map<String, String> properties = new HashMap<>();
+        // Add properties if needed
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.ZLIB, 
orcFileFormatProperties.getOrcCompressionType());
+    }
+
+    @Test
+    public void testSupportedCompressionTypes() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("compress_type", "plain");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.PLAIN, 
orcFileFormatProperties.getOrcCompressionType());
+
+        properties.put("compress_type", "snappy");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.SNAPPYBLOCK, 
orcFileFormatProperties.getOrcCompressionType());
+
+        properties.put("compress_type", "zlib");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.ZLIB, 
orcFileFormatProperties.getOrcCompressionType());
+
+        properties.put("compress_type", "zstd");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.ZSTD, 
orcFileFormatProperties.getOrcCompressionType());
+    }
+
+    @Test
+    public void testCompressionTypeCaseInsensitive() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("compress_type", "SNAPPY");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+        Assert.assertEquals(TFileCompressType.SNAPPYBLOCK, 
orcFileFormatProperties.getOrcCompressionType());
+    }
+
+    @Test(expected = 
org.apache.doris.nereids.exceptions.AnalysisException.class)
+    public void testInvalidCompressionType() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("compress_type", "invalid_type");
+        orcFileFormatProperties.analyzeFileFormatProperties(properties, true);
+    }
+
+    @Test
+    public void testFullTResultFileSinkOptions() {
+        TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+        orcFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+        Assert.assertEquals(orcFileFormatProperties.getOrcCompressionType(), 
sinkOptions.getOrcCompressionType());
+        Assert.assertEquals(1, sinkOptions.getOrcWriterVersion());
+    }
+
+    @Test
+    public void testToTFileAttributes() {
+        TFileAttributes attrs = orcFileFormatProperties.toTFileAttributes();
+        Assert.assertNotNull(attrs);
+        Assert.assertNotNull(attrs.getTextParams());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..754d857613f
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/ParquetFileFormatPropertiesTest.java
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.apache.doris.nereids.exceptions.AnalysisException;
+import org.apache.doris.thrift.TFileAttributes;
+import org.apache.doris.thrift.TParquetCompressionType;
+import org.apache.doris.thrift.TParquetVersion;
+import org.apache.doris.thrift.TResultFileSinkOptions;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ParquetFileFormatPropertiesTest {
+
+    private ParquetFileFormatProperties parquetFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        parquetFileFormatProperties = new ParquetFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatProperties() {
+        Map<String, String> properties = new HashMap<>();
+        // Add properties if needed
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+
+        Assert.assertEquals(TParquetCompressionType.SNAPPY, 
parquetFileFormatProperties.getParquetCompressionType());
+        Assert.assertEquals(false, 
parquetFileFormatProperties.isParquetDisableDictionary());
+    }
+
+    @Test
+    public void testSupportedCompressionTypes() {
+        Map<String, String> properties = new HashMap<>();
+        String[] types = {"snappy", "gzip", "brotli", "zstd", "lz4", "plain"};
+        TParquetCompressionType[] expected = {
+            TParquetCompressionType.SNAPPY,
+            TParquetCompressionType.GZIP,
+            TParquetCompressionType.BROTLI,
+            TParquetCompressionType.ZSTD,
+            TParquetCompressionType.LZ4,
+            TParquetCompressionType.UNCOMPRESSED
+        };
+        for (int i = 0; i < types.length; i++) {
+            properties.put("compress_type", types[i]);
+            
parquetFileFormatProperties.analyzeFileFormatProperties(properties, true);
+            Assert.assertEquals(expected[i], 
parquetFileFormatProperties.getParquetCompressionType());
+        }
+    }
+
+    @Test
+    public void testCompressionTypeCaseInsensitive() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("compress_type", "SNAPPY");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        Assert.assertEquals(TParquetCompressionType.SNAPPY, 
parquetFileFormatProperties.getParquetCompressionType());
+    }
+
+    @Test(expected = AnalysisException.class)
+    public void testInvalidCompressionType() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("compress_type", "invalid_type");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+    }
+
+    @Test
+    public void testParquetDisableDictionary() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("parquet.disable_dictionary", "true");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        
Assert.assertTrue(parquetFileFormatProperties.isParquetDisableDictionary());
+        properties.put("parquet.disable_dictionary", "false");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        
Assert.assertFalse(parquetFileFormatProperties.isParquetDisableDictionary());
+    }
+
+    @Test
+    public void testParquetVersion() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("parquet.version", "v1");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+
+        TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+        parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+        Assert.assertEquals(TParquetVersion.PARQUET_1_0, 
sinkOptions.getParquetVersion());
+
+        properties.put("parquet.version", "latest");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+        sinkOptions = new TResultFileSinkOptions();
+        parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+        Assert.assertEquals(TParquetVersion.PARQUET_2_LATEST, 
sinkOptions.getParquetVersion());
+    }
+
+    @Test
+    public void testParquetVersionInvalid() {
+        Map<String, String> properties = new HashMap<>();
+        properties.put("parquet.version", "invalid");
+        parquetFileFormatProperties.analyzeFileFormatProperties(properties, 
true);
+
+        TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+        parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+        Assert.assertEquals(TParquetVersion.PARQUET_1_0, 
sinkOptions.getParquetVersion());
+    }
+
+    @Test
+    public void testFullTResultFileSinkOptions() {
+        TResultFileSinkOptions sinkOptions = new TResultFileSinkOptions();
+        parquetFileFormatProperties.fullTResultFileSinkOptions(sinkOptions);
+        
Assert.assertEquals(parquetFileFormatProperties.getParquetCompressionType(), 
sinkOptions.getParquetCompressionType());
+        
Assert.assertEquals(parquetFileFormatProperties.isParquetDisableDictionary(), 
sinkOptions.isParquetDisableDictionary());
+    }
+
+    @Test
+    public void testToTFileAttributes() {
+        TFileAttributes attrs = 
parquetFileFormatProperties.toTFileAttributes();
+        Assert.assertNotNull(attrs);
+        Assert.assertNotNull(attrs.getTextParams());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
new file mode 100644
index 00000000000..d94b49aca97
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/property/fileformat/WalFileFormatPropertiesTest.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.property.fileformat;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WalFileFormatPropertiesTest {
+
+    private WalFileFormatProperties walFileFormatProperties;
+
+    @Before
+    public void setUp() {
+        walFileFormatProperties = new WalFileFormatProperties();
+    }
+
+    @Test
+    public void testAnalyzeFileFormatProperties() {
+        Map<String, String> properties = new HashMap<>();
+        // Add properties if needed
+        walFileFormatProperties.analyzeFileFormatProperties(properties, true);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to