This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2bdaeb6a07 [Fix][Connector-File] Fix parquet support user config
schema (#9596)
2bdaeb6a07 is described below
commit 2bdaeb6a07e3b59b9afd894cde701d85ae447c2a
Author: JeremyXin <[email protected]>
AuthorDate: Wed Jul 23 10:05:48 2025 +0800
[Fix][Connector-File] Fix parquet support user config schema (#9596)
---
.../apache/seatunnel/api/table/type/TypeUtil.java | 5 +-
.../file/hdfs/source/BaseHdfsFileSource.java | 8 +-
.../file/source/reader/ParquetReadStrategy.java | 131 +++++++++++++----
.../file/writer/ParquetReadStrategyTest.java | 159 ++++++++++++++++++++-
.../resources/test_user_config_read_parquet.conf | 29 ++++
5 files changed, 301 insertions(+), 31 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
index b8df6d80e5..84fbfd0670 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
@@ -37,7 +37,10 @@ public class TypeUtil {
return to.getSqlType() == SqlType.BIGINT;
}
if (from.getSqlType() == SqlType.FLOAT) {
- return to.getSqlType() == SqlType.DOUBLE;
+ return to.getSqlType() == SqlType.DOUBLE || to.getSqlType() ==
SqlType.DECIMAL;
+ }
+ if (from.getSqlType() == SqlType.DOUBLE) {
+ return to.getSqlType() == SqlType.DECIMAL;
}
return false;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 29cd0ec39c..7e93e39cec 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -107,19 +107,23 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
.toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(ConnectorCommonOptions.SCHEMA.key())) {
+ CatalogTable userDefinedCatalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
switch (fileFormat) {
case CSV:
case TEXT:
case JSON:
case EXCEL:
case XML:
- CatalogTable userDefinedCatalogTable =
- CatalogTableUtil.buildWithConfig(pluginConfig);
readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
case PARQUET:
+ rowType =
+
readStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ filePaths.get(0),
+
userDefinedCatalogTable.getSeaTunnelRowType());
+ break;
case BINARY:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 37d03b8ab7..2c2e134f4c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -58,6 +58,8 @@ import org.apache.parquet.schema.Type;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.time.Instant;
@@ -71,6 +73,8 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
+import static org.apache.seatunnel.api.table.type.TypeUtil.canConvert;
+
@Slf4j
public class ParquetReadStrategy extends AbstractReadStrategy {
private static final byte[] PARQUET_MAGIC =
@@ -185,14 +189,31 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
resolveObject(value, valueType)));
return dataMap;
case BOOLEAN:
+ return Boolean.parseBoolean(field.toString());
case INT:
+ return Integer.parseInt(field.toString());
case BIGINT:
+ return Long.parseLong(field.toString());
case FLOAT:
+ return Float.parseFloat(field.toString());
case DOUBLE:
+ return Double.parseDouble(field.toString());
case DECIMAL:
+ if (field instanceof Float || field instanceof Double) {
+ DecimalType decimalType = (DecimalType) fieldType;
+ return new BigDecimal(field.toString())
+ .setScale(decimalType.getScale(),
RoundingMode.HALF_UP);
+ }
+ return field;
case DATE:
return field;
case STRING:
+ if (field instanceof ByteBuffer) {
+ ByteBuffer buffer = (ByteBuffer) field;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes, 0, bytes.length);
+ return new String(bytes);
+ }
return field.toString();
case TINYINT:
return Byte.parseByte(field.toString());
@@ -238,12 +259,18 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException {
- return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
+ return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
}
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath,
String path)
throws FileConnectorException {
+ return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ String path, SeaTunnelRowType configRowType) throws
FileConnectorException {
ParquetMetadata metadata;
try (ParquetFileReader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
@@ -259,6 +286,7 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
throw new FileConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED,
errorMsg, e);
}
+
FileMetaData fileMetaData = metadata.getFileMetaData();
MessageType originalSchema = fileMetaData.getSchema();
if (readColumns.isEmpty()) {
@@ -270,62 +298,66 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
indexes = new int[readColumns.size()];
buildColumnsWithErrorCheck(
- tablePath,
+ TablePath.DEFAULT,
IntStream.range(0, readColumns.size()).iterator(),
i -> {
fields[i] = readColumns.get(i);
Type type = originalSchema.getType(fields[i]);
int fieldIndex = originalSchema.getFieldIndex(fields[i]);
indexes[i] = fieldIndex;
- types[i] = parquetType2SeaTunnelType(type, fields[i]);
+ SeaTunnelDataType<?> configDataType =
+ getConfigFieldType(configRowType, fields[i]);
+ types[i] = parquetType2SeaTunnelType(type, configDataType,
fields[i]);
});
+
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
return getActualSeaTunnelRowTypeInfo();
}
- private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String
name) {
+ private SeaTunnelDataType<?> parquetType2SeaTunnelType(
+ Type type, SeaTunnelDataType<?> configType, String name) {
if (type.isPrimitive()) {
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
OriginalType originalType =
type.asPrimitiveType().getOriginalType();
if (originalType == null) {
- return BasicType.INT_TYPE;
+ return getFinalType(BasicType.INT_TYPE, configType);
}
switch (type.asPrimitiveType().getOriginalType()) {
case INT_8:
- return BasicType.BYTE_TYPE;
+ return getFinalType(BasicType.BYTE_TYPE,
configType);
case INT_16:
- return BasicType.SHORT_TYPE;
+ return getFinalType(BasicType.SHORT_TYPE,
configType);
case INT_32:
- return BasicType.INT_TYPE;
+ return getFinalType(BasicType.INT_TYPE,
configType);
case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
+ return getFinalType(LocalTimeType.LOCAL_DATE_TYPE,
configType);
default:
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
case INT64:
if (type.asPrimitiveType().getOriginalType() ==
OriginalType.TIMESTAMP_MILLIS) {
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ return
getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, configType);
}
- return BasicType.LONG_TYPE;
+ return getFinalType(BasicType.LONG_TYPE, configType);
case INT96:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ return getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE,
configType);
case BINARY:
if (type.asPrimitiveType().getOriginalType() == null) {
- return PrimitiveByteArrayType.INSTANCE;
+ return getFinalType(PrimitiveByteArrayType.INSTANCE,
configType);
}
- return BasicType.STRING_TYPE;
+ return getFinalType(BasicType.STRING_TYPE, configType);
case FLOAT:
- return BasicType.FLOAT_TYPE;
+ return getFinalType(BasicType.FLOAT_TYPE, configType);
case DOUBLE:
- return BasicType.DOUBLE_TYPE;
+ return getFinalType(BasicType.DOUBLE_TYPE, configType);
case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
+ return getFinalType(BasicType.BOOLEAN_TYPE, configType);
case FIXED_LEN_BYTE_ARRAY:
if (type.getLogicalTypeAnnotation() == null) {
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ return
getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, configType);
}
String typeInfo =
type.getLogicalTypeAnnotation()
@@ -336,7 +368,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
String[] splits = typeInfo.split(",");
int precision = Integer.parseInt(splits[0]);
int scale = Integer.parseInt(splits[1]);
- return new DecimalType(precision, scale);
+ DecimalType decimalType = new DecimalType(precision,
scale);
+ return getFinalType(decimalType, configType);
default:
throw CommonError.convertToSeaTunnelTypeError("Parquet",
type.toString(), name);
}
@@ -350,8 +383,15 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
SeaTunnelDataType<?>[] seaTunnelDataTypes = new
SeaTunnelDataType<?>[fields.size()];
for (int i = 0; i < fields.size(); i++) {
Type fieldType = fields.get(i);
+ SeaTunnelDataType<?> configDataType = null;
+ if (configType instanceof SeaTunnelRowType) {
+ SeaTunnelRowType configRowType = (SeaTunnelRowType)
configType;
+ if (configRowType.getFieldTypes().length > i) {
+ configDataType = configRowType.getFieldType(i);
+ }
+ }
SeaTunnelDataType<?> seaTunnelDataType =
- parquetType2SeaTunnelType(fields.get(i), name);
+ parquetType2SeaTunnelType(fields.get(i),
configDataType, name);
fieldNames[i] = fieldType.getName();
seaTunnelDataTypes[i] = seaTunnelDataType;
}
@@ -360,11 +400,24 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
switch (logicalTypeAnnotation.toOriginalType()) {
case MAP:
GroupType groupType =
type.asGroupType().getType(0).asGroupType();
- SeaTunnelDataType<?> keyType =
-
parquetType2SeaTunnelType(groupType.getType(0), name);
- SeaTunnelDataType<?> valueType =
-
parquetType2SeaTunnelType(groupType.getType(1), name);
- return new MapType<>(keyType, valueType);
+ if (configType instanceof MapType) {
+ SeaTunnelDataType<?> keyDataType =
+ ((MapType<?, ?>) configType).getKeyType();
+ SeaTunnelDataType<?> valueDataType =
+ ((MapType<?, ?>)
configType).getValueType();
+ keyDataType =
+ parquetType2SeaTunnelType(
+ groupType.getType(0), keyDataType,
name);
+ valueDataType =
+ parquetType2SeaTunnelType(
+ groupType.getType(1),
valueDataType, name);
+
+ return new MapType<>(keyDataType, valueDataType);
+ } else {
+ return new MapType<>(
+
parquetType2SeaTunnelType(groupType.getType(0), null, name),
+
parquetType2SeaTunnelType(groupType.getType(1), null, name));
+ }
case LIST:
Type elementType;
try {
@@ -373,7 +426,13 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
elementType = type.asGroupType().getType(0);
}
SeaTunnelDataType<?> fieldType =
- parquetType2SeaTunnelType(elementType, name);
+ parquetType2SeaTunnelType(elementType, null,
name);
+ if (configType instanceof ArrayType) {
+ SeaTunnelDataType<?> seaTunnelDataType =
+ ((ArrayType) configType).getElementType();
+ fieldType =
+ parquetType2SeaTunnelType(elementType,
seaTunnelDataType, name);
+ }
switch (fieldType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
@@ -420,4 +479,24 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
throw new
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
}
+
+ private SeaTunnelDataType<?> getFinalType(
+ SeaTunnelDataType<?> fileType, SeaTunnelDataType<?> configType) {
+ if (configType == null) {
+ return fileType;
+ }
+ return canConvert(fileType, configType) ? configType : fileType;
+ }
+
+ private SeaTunnelDataType<?> getConfigFieldType(
+ SeaTunnelRowType configRowType, String fieldName) {
+
+ if (configRowType == null) {
+ return null;
+ }
+
+ int fieldIndex =
Arrays.asList(configRowType.getFieldNames()).indexOf(fieldName);
+
+ return fieldIndex == -1 ? null :
configRowType.getFieldType(fieldIndex);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 1050c4a2d9..e66393e263 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,9 +30,11 @@ import
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
@@ -57,11 +61,17 @@ import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.io.IOException;
+import java.math.BigDecimal;
import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.TimeZone;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
@@ -249,6 +259,71 @@ public class ParquetReadStrategyTest {
parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "",
testCollector);
}
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetWithUserConfigRowType() throws Exception {
+ AutoGenerateParquetData.generateTestData();
+ String path = AutoGenerateParquetData.DATA_FILE_PATH;
+
+ URL conf =
ParquetReadStrategyTest.class.getResource("/test_user_config_read_parquet.conf");
+ Assertions.assertNotNull(conf);
+ String confPath = Paths.get(conf.toURI()).toString();
+ Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
+
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+
+ SeaTunnelRowType configRowType = catalogTable.getSeaTunnelRowType();
+ parquetReadStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(path,
configRowType);
+
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, "default", testCollector);
+ List<SeaTunnelRow> rows = testCollector.getRows();
+ SeaTunnelRow row = rows.get(0);
+
+ // Verify whether the data type and type conversion are correct
+ // id convert to String
+ Assertions.assertEquals(String.class, row.getField(0).getClass());
+ Assertions.assertEquals(String.class, row.getField(1).getClass());
+ // salary convert to Double
+ Assertions.assertEquals(Double.class, row.getField(2).getClass());
+ Assertions.assertTrue(row.getField(3) instanceof String[]);
+ // age convert to Long
+ Assertions.assertEquals(Long.class, row.getField(4).getClass());
+ Assertions.assertEquals(Boolean.class, row.getField(5).getClass());
+ // score convert to Decimal
+ Assertions.assertEquals(BigDecimal.class, row.getField(6).getClass());
+ Assertions.assertEquals(BigDecimal.class, row.getField(7).getClass());
+ Assertions.assertEquals(LocalDate.class, row.getField(8).getClass());
+ Assertions.assertEquals(LocalDateTime.class,
row.getField(9).getClass());
+ Assertions.assertEquals(HashMap.class, row.getField(10).getClass());
+ Assertions.assertEquals(byte[].class, row.getField(11).getClass());
+ // binary_as_string convert to String
+ Assertions.assertEquals(String.class, row.getField(12).getClass());
+
+ Assertions.assertEquals("1", row.getField(0));
+ Assertions.assertEquals("Alice", row.getField(1));
+ Assertions.assertEquals(50000.0, row.getField(2));
+ String[] skills = (String[]) row.getField(3);
+ Assertions.assertEquals(2, skills.length);
+ Assertions.assertEquals("Java", skills[0]);
+ Assertions.assertEquals("Python", skills[1]);
+ Assertions.assertEquals(30L, row.getField(4));
+ Assertions.assertEquals(true, row.getField(5));
+ Assertions.assertEquals(new BigDecimal("98.50"), row.getField(6));
+ Assertions.assertEquals(new BigDecimal("1198.02"), row.getField(7));
+ Assertions.assertNotNull(row.getField(8));
+ Assertions.assertNotNull(row.getField(9));
+ Assertions.assertTrue(((HashMap<?, ?>)
row.getField(10)).containsKey("department"));
+ Assertions.assertArrayEquals(
+ "binary data example".getBytes(StandardCharsets.UTF_8),
(byte[]) row.getField(11));
+ Assertions.assertEquals("binary_as_string", row.getField(12));
+
+ AutoGenerateParquetData.deleteFile();
+ }
+
public static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -294,12 +369,27 @@ public class ParquetReadStrategyTest {
public static void generateTestData() throws IOException {
deleteFile();
+
+ // create schema, which includes various data types
String schemaString =
-
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
+ "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"int\"},"
+ + "{\"name\":\"name\",\"type\":\"string\"},"
+ + "{\"name\":\"salary\",\"type\":\"float\"},"
+ +
"{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},"
+ + "{\"name\":\"age\",\"type\":\"int\"},"
+ + "{\"name\":\"active\",\"type\":\"boolean\"},"
+ + "{\"name\":\"score\",\"type\":\"double\"},"
+ +
"{\"name\":\"budget\",\"type\":{\"type\":\"fixed\",\"name\":\"BudgetDecimal\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":8,\"scale\":2}},"
+ +
"{\"name\":\"join_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+ +
"{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+ +
"{\"name\":\"properties\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},"
+ + "{\"name\":\"binary_data\",\"type\":\"bytes\"},"
+ +
"{\"name\":\"binary_as_string\",\"type\":\"bytes\"}"
+ + "]}";
Schema schema = new Schema.Parser().parse(schemaString);
Configuration conf = new Configuration();
-
Path file = new Path(DATA_FILE_PATH);
ParquetWriter<GenericRecord> writer =
@@ -309,26 +399,91 @@ public class ParquetReadStrategyTest {
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();
+ // create first record
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
+ record1.put("age", 30);
+ record1.put("active", true);
+ record1.put("score", 98.5f);
+ record1.put("created_at", System.currentTimeMillis());
+
+ // Date type
+ record1.put("join_date", 20289);
+
+ // Decimal type
+ BigDecimal budget = new BigDecimal("1198.02");
+ Schema.Field budgetField = schema.getField("budget");
+ Schema budgetSchema = budgetField.schema();
+ Conversions.DecimalConversion decimalConversion = new
Conversions.DecimalConversion();
+ GenericFixed budgetFixed =
+ decimalConversion.toFixed(budget, budgetSchema,
budgetSchema.getLogicalType());
+ record1.put("budget", budgetFixed);
+
+ // Array type
GenericArray<Utf8> skills1 =
new GenericData.Array<>(2,
schema.getField("skills").schema());
skills1.add(new Utf8("Java"));
skills1.add(new Utf8("Python"));
record1.put("skills", skills1);
+
+ // Map type
+ Map<Utf8, Utf8> properties1 = new HashMap<>();
+ properties1.put(new Utf8("department"), new Utf8("Engineering"));
+ properties1.put(new Utf8("location"), new Utf8("Beijing"));
+ record1.put("properties", properties1);
+
+ // Binary type
+ record1.put(
+ "binary_data",
+ ByteBuffer.wrap("binary data
example".getBytes(StandardCharsets.UTF_8)));
+ record1.put(
+ "binary_as_string",
+
ByteBuffer.wrap("binary_as_string".getBytes(StandardCharsets.UTF_8)));
+
writer.write(record1);
+ // create second record
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", 2);
record2.put("name", "Bob");
record2.put("salary", 60000.0);
+ record2.put("age", 35);
+ record2.put("active", false);
+ record2.put("score", 89.2f);
+ record2.put("created_at", System.currentTimeMillis() - 86400000);
+
+ // Date type
+ record2.put("join_date", 20288);
+
+ // Decimal type
+ BigDecimal budget2 = new BigDecimal("2394.13");
+ Schema.Field budgetField2 = schema.getField("budget");
+ Schema budgetSchema2 = budgetField2.schema();
+ GenericFixed budgetFixed2 =
+ decimalConversion.toFixed(
+ budget2, budgetSchema2,
budgetSchema2.getLogicalType());
+ record2.put("budget", budgetFixed2);
+
GenericArray<Utf8> skills2 =
new GenericData.Array<>(2,
schema.getField("skills").schema());
skills2.add(new Utf8("C++"));
skills2.add(new Utf8("Go"));
record2.put("skills", skills2);
+
+ Map<Utf8, Utf8> properties2 = new HashMap<>();
+ properties2.put(new Utf8("department"), new Utf8("Marketing"));
+ properties2.put(new Utf8("location"), new Utf8("Shanghai"));
+ record2.put("properties", properties2);
+
+ record2.put(
+ "binary_data",
+ ByteBuffer.wrap("another binary
example".getBytes(StandardCharsets.UTF_8)));
+ record2.put(
+ "binary_as_string",
+ ByteBuffer.wrap("another
binary_as_string".getBytes(StandardCharsets.UTF_8)));
+
writer.write(record2);
writer.close();
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
new file mode 100644
index 0000000000..bf1183662b
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+schema {
+ fields {
+ id = "string"
+ salary = "double"
+ age = "long"
+ score = "decimal(10,2)"
+ binary_as_string = "string"
+ properties = "map<string,string>"
+ }
+ }
+}