This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 22bf2889e5d [feature](tvf)(jni-avro)jni-avro scanner add complex data
types (#26236)
22bf2889e5d is described below
commit 22bf2889e5d47175d18942fa02db47d1f6128763
Author: wudongliang <[email protected]>
AuthorDate: Thu Nov 9 13:58:49 2023 +0800
[feature](tvf)(jni-avro)jni-avro scanner add complex data types (#26236)
Support avro's enum, record, union data types
---
be/src/vec/exec/format/avro/avro_jni_reader.cpp | 44 +++---
.../avro/avro_all_types/all_type.avro | Bin 0 -> 1155 bytes
.../org/apache/doris/avro/AvroColumnValue.java | 34 ++++-
.../java/org/apache/doris/avro/AvroJNIScanner.java | 56 +-------
.../java/org/apache/doris/avro/AvroTypeUtils.java | 122 ++++++++++++++++
.../org/apache/doris/avro/AvroTypeUtilsTest.java | 105 ++++++++++++++
.../apache/doris/common/jni/vec/TableSchema.java | 10 +-
.../data/external_table_p0/tvf/test_tvf_avro.out | 73 ++++++++++
.../external_table_p0/tvf/test_tvf_avro.groovy | 154 +++++++++++++++++++++
9 files changed, 513 insertions(+), 85 deletions(-)
diff --git a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
index e682ff9886d..f3cff19c04d 100644
--- a/be/src/vec/exec/format/avro/avro_jni_reader.cpp
+++ b/be/src/vec/exec/format/avro/avro_jni_reader.cpp
@@ -86,18 +86,10 @@ Status AvroJNIReader::init_fetch_table_reader(
{"file_type", std::to_string(type)},
{"is_get_table_schema", "false"},
{"hive.serde", "org.apache.hadoop.hive.serde2.avro.AvroSerDe"}};
- switch (type) {
- case TFileType::FILE_HDFS:
- required_param.insert(std::make_pair("uri",
_params.hdfs_params.hdfs_conf.data()->value));
- break;
- case TFileType::FILE_S3:
- required_param.insert(std::make_pair("uri", _range.path));
+ if (type == TFileType::FILE_S3) {
required_param.insert(_params.properties.begin(),
_params.properties.end());
- break;
- default:
- return Status::InternalError("unsupported file reader type: {}",
std::to_string(type));
}
- required_param.insert(_params.properties.begin(),
_params.properties.end());
+ required_param.insert(std::make_pair("uri", _range.path));
_jni_connector =
std::make_unique<JniConnector>("org/apache/doris/avro/AvroJNIScanner",
required_param,
column_names);
RETURN_IF_ERROR(_jni_connector->init(_colname_to_value_range));
@@ -144,8 +136,7 @@ Status
AvroJNIReader::get_parsed_schema(std::vector<std::string>* col_names,
}
TypeDescriptor AvroJNIReader::convert_to_doris_type(const rapidjson::Value&
column_schema) {
- ::doris::TPrimitiveType::type schema_type =
- static_cast<
::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
+ auto schema_type = static_cast<
::doris::TPrimitiveType::type>(column_schema["type"].GetInt());
switch (schema_type) {
case TPrimitiveType::INT:
case TPrimitiveType::STRING:
@@ -153,30 +144,35 @@ TypeDescriptor AvroJNIReader::convert_to_doris_type(const
rapidjson::Value& colu
case TPrimitiveType::BOOLEAN:
case TPrimitiveType::DOUBLE:
case TPrimitiveType::FLOAT:
- return TypeDescriptor(thrift_to_type(schema_type));
+ case TPrimitiveType::BINARY:
+ return {thrift_to_type(schema_type)};
case TPrimitiveType::ARRAY: {
TypeDescriptor list_type(PrimitiveType::TYPE_ARRAY);
-
list_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
+ const rapidjson::Value& childColumns = column_schema["childColumns"];
+ list_type.add_sub_type(convert_to_doris_type(childColumns[0]));
return list_type;
}
case TPrimitiveType::MAP: {
TypeDescriptor map_type(PrimitiveType::TYPE_MAP);
-
+ const rapidjson::Value& childColumns = column_schema["childColumns"];
// The default type of AVRO MAP structure key is STRING
map_type.add_sub_type(PrimitiveType::TYPE_STRING);
-
map_type.add_sub_type(convert_complex_type(column_schema["childColumn"].GetObject()));
+ map_type.add_sub_type(convert_to_doris_type(childColumns[1]));
return map_type;
}
+ case TPrimitiveType::STRUCT: {
+ TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT);
+ const rapidjson::Value& childColumns = column_schema["childColumns"];
+ for (auto i = 0; i < childColumns.Size(); i++) {
+ const rapidjson::Value& child = childColumns[i];
+ struct_type.add_sub_type(convert_to_doris_type(childColumns[i]),
+ std::string(child["name"].GetString()));
+ }
+ return struct_type;
+ }
default:
- return TypeDescriptor(PrimitiveType::INVALID_TYPE);
+ return {PrimitiveType::INVALID_TYPE};
}
}
-TypeDescriptor AvroJNIReader::convert_complex_type(
- const rapidjson::Document::ConstObject child_schema) {
- ::doris::TPrimitiveType::type child_schema_type =
- static_cast<
::doris::TPrimitiveType::type>(child_schema["type"].GetInt());
- return TypeDescriptor(thrift_to_type(child_schema_type));
-}
-
} // namespace doris::vectorized
diff --git
a/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro
new file mode 100644
index 00000000000..c66017bb624
Binary files /dev/null and
b/docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/avro/avro_all_types/all_type.avro
differ
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
index dd72c9aad50..77c6fba37dc 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroColumnValue.java
@@ -19,17 +19,25 @@ package org.apache.doris.avro;
import org.apache.doris.common.jni.vec.ColumnValue;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Fixed;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map.Entry;
+import java.util.Objects;
public class AvroColumnValue implements ColumnValue {
@@ -42,6 +50,12 @@ public class AvroColumnValue implements ColumnValue {
}
private Object inspectObject() {
+ if (fieldData instanceof ByteBuffer) {
+ return ((PrimitiveObjectInspector)
fieldInspector).getPrimitiveJavaObject(((ByteBuffer) fieldData).array());
+ } else if (fieldData instanceof Fixed) {
+ return ((PrimitiveObjectInspector)
fieldInspector).getPrimitiveJavaObject(
+ ((GenericFixed) fieldData).bytes());
+ }
return ((PrimitiveObjectInspector)
fieldInspector).getPrimitiveJavaObject(fieldData);
}
@@ -162,6 +176,24 @@ public class AvroColumnValue implements ColumnValue {
@Override
public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue>
values) {
-
+ StructObjectInspector inspector = (StructObjectInspector)
fieldInspector;
+ List<? extends StructField> fields = inspector.getAllStructFieldRefs();
+ for (Integer idx : structFieldIndex) {
+ AvroColumnValue cv = null;
+ if (idx != null) {
+ StructField sf = fields.get(idx);
+ Object o;
+ if (fieldData instanceof GenericData.Record) {
+ GenericRecord record = (GenericRecord)
inspector.getStructFieldData(fieldData, sf);
+ o = record.get(idx);
+ } else {
+ o = inspector.getStructFieldData(fieldData, sf);
+ }
+ if (Objects.nonNull(o)) {
+ cv = new AvroColumnValue(sf.getFieldObjectInspector(), o);
+ }
+ }
+ values.add(cv);
+ }
}
}
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
index 11bce610d70..75cbc721e31 100644
---
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroJNIScanner.java
@@ -21,12 +21,9 @@ import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.TableSchema;
-import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
import org.apache.doris.thrift.TFileType;
-import org.apache.doris.thrift.TPrimitiveType;
import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
@@ -40,10 +37,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
@@ -193,54 +187,6 @@ public class AvroJNIScanner extends JniScanner {
@Override
protected TableSchema parseTableSchema() throws
UnsupportedOperationException {
Schema schema = avroReader.getSchema();
- List<Field> schemaFields = schema.getFields();
- List<SchemaColumn> schemaColumns = new ArrayList<>();
- for (Field schemaField : schemaFields) {
- Schema avroSchema = schemaField.schema();
- String columnName = schemaField.name().toLowerCase(Locale.ROOT);
-
- SchemaColumn schemaColumn = new SchemaColumn();
- TPrimitiveType tPrimitiveType = serializeSchemaType(avroSchema,
schemaColumn);
- schemaColumn.setName(columnName);
- schemaColumn.setType(tPrimitiveType);
- schemaColumns.add(schemaColumn);
- }
- return new TableSchema(schemaColumns);
- }
-
- private TPrimitiveType serializeSchemaType(Schema avroSchema, SchemaColumn
schemaColumn)
- throws UnsupportedOperationException {
- Schema.Type type = avroSchema.getType();
- switch (type) {
- case NULL:
- return TPrimitiveType.NULL_TYPE;
- case STRING:
- return TPrimitiveType.STRING;
- case INT:
- return TPrimitiveType.INT;
- case BOOLEAN:
- return TPrimitiveType.BOOLEAN;
- case LONG:
- return TPrimitiveType.BIGINT;
- case FLOAT:
- return TPrimitiveType.FLOAT;
- case BYTES:
- return TPrimitiveType.BINARY;
- case DOUBLE:
- return TPrimitiveType.DOUBLE;
- case ARRAY:
- SchemaColumn arrayChildColumn = new SchemaColumn();
- schemaColumn.addChildColumn(arrayChildColumn);
-
arrayChildColumn.setType(serializeSchemaType(avroSchema.getElementType(),
arrayChildColumn));
- return TPrimitiveType.ARRAY;
- case MAP:
- SchemaColumn mapChildColumn = new SchemaColumn();
- schemaColumn.addChildColumn(mapChildColumn);
-
mapChildColumn.setType(serializeSchemaType(avroSchema.getValueType(),
mapChildColumn));
- return TPrimitiveType.MAP;
- default:
- throw new UnsupportedOperationException("avro format: " +
type.getName() + " is not supported.");
- }
+ return AvroTypeUtils.parseTableSchema(schema);
}
-
}
diff --git
a/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java
new file mode 100644
index 00000000000..cd597fa4cfc
--- /dev/null
+++
b/fe/be-java-extensions/avro-scanner/src/main/java/org/apache/doris/avro/AvroTypeUtils.java
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.doris.common.jni.vec.TableSchema;
+import org.apache.doris.common.jni.vec.TableSchema.SchemaColumn;
+import org.apache.doris.thrift.TPrimitiveType;
+
+import com.google.common.base.Preconditions;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.commons.compress.utils.Lists;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class AvroTypeUtils {
+
+ protected static TableSchema parseTableSchema(Schema schema) throws
UnsupportedOperationException {
+ List<Field> schemaFields = schema.getFields();
+ List<SchemaColumn> schemaColumns = new ArrayList<>();
+ for (Field schemaField : schemaFields) {
+ Schema avroSchema = schemaField.schema();
+ String columnName = schemaField.name();
+
+ SchemaColumn schemaColumn = new SchemaColumn();
+ TPrimitiveType tPrimitiveType = typeFromAvro(avroSchema,
schemaColumn);
+ schemaColumn.setName(columnName);
+ schemaColumn.setType(tPrimitiveType);
+ schemaColumns.add(schemaColumn);
+ }
+ return new TableSchema(schemaColumns);
+ }
+
+ private static TPrimitiveType typeFromAvro(Schema avroSchema, SchemaColumn
schemaColumn)
+ throws UnsupportedOperationException {
+ Schema.Type type = avroSchema.getType();
+ switch (type) {
+ case ENUM:
+ case STRING:
+ return TPrimitiveType.STRING;
+ case INT:
+ return TPrimitiveType.INT;
+ case BOOLEAN:
+ return TPrimitiveType.BOOLEAN;
+ case LONG:
+ return TPrimitiveType.BIGINT;
+ case FLOAT:
+ return TPrimitiveType.FLOAT;
+ case FIXED:
+ case BYTES:
+ return TPrimitiveType.BINARY;
+ case DOUBLE:
+ return TPrimitiveType.DOUBLE;
+ case ARRAY:
+ SchemaColumn arrayChildColumn = new SchemaColumn();
+
schemaColumn.addChildColumns(Collections.singletonList(arrayChildColumn));
+
arrayChildColumn.setType(typeFromAvro(avroSchema.getElementType(),
arrayChildColumn));
+ return TPrimitiveType.ARRAY;
+ case MAP:
+ // The default type of AVRO MAP structure key is STRING
+ SchemaColumn keyChildColumn = new SchemaColumn();
+ keyChildColumn.setType(TPrimitiveType.STRING);
+ SchemaColumn valueChildColumn = new SchemaColumn();
+
valueChildColumn.setType(typeFromAvro(avroSchema.getValueType(),
valueChildColumn));
+
+ schemaColumn.addChildColumns(Arrays.asList(keyChildColumn,
valueChildColumn));
+ return TPrimitiveType.MAP;
+ case RECORD:
+ List<Field> fields = avroSchema.getFields();
+ List<SchemaColumn> childSchemaColumn = Lists.newArrayList();
+ for (Field field : fields) {
+ SchemaColumn structChildColumn = new SchemaColumn();
+ structChildColumn.setName(field.name());
+ structChildColumn.setType(typeFromAvro(field.schema(),
structChildColumn));
+ childSchemaColumn.add(structChildColumn);
+ }
+ schemaColumn.addChildColumns(childSchemaColumn);
+ return TPrimitiveType.STRUCT;
+ case UNION:
+ List<Schema> nonNullableMembers =
filterNullableUnion(avroSchema);
+ Preconditions.checkArgument(!nonNullableMembers.isEmpty(),
+ avroSchema.getName() + "Union child type not all
nullAble type");
+ List<SchemaColumn> childSchemaColumns = Lists.newArrayList();
+ for (Schema nullableMember : nonNullableMembers) {
+ SchemaColumn childColumn = new SchemaColumn();
+ childColumn.setName(nullableMember.getName());
+ childColumn.setType(typeFromAvro(nullableMember,
childColumn));
+ childSchemaColumns.add(childColumn);
+ }
+ schemaColumn.addChildColumns(childSchemaColumns);
+ return TPrimitiveType.STRUCT;
+ default:
+ throw new UnsupportedOperationException(
+ "avro format: " + avroSchema.getName() +
type.getName() + " is not supported.");
+ }
+ }
+
+ private static List<Schema> filterNullableUnion(Schema schema) {
+ Preconditions.checkArgument(schema.isUnion(), "Schema must be union");
+ return schema.getTypes().stream().filter(s ->
!s.isNullable()).collect(Collectors.toList());
+ }
+
+}
diff --git
a/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java
b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java
new file mode 100644
index 00000000000..04f5fd217bf
--- /dev/null
+++
b/fe/be-java-extensions/avro-scanner/src/test/java/org/apache/doris/avro/AvroTypeUtilsTest.java
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.avro;
+
+import org.apache.doris.common.jni.vec.TableSchema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class AvroTypeUtilsTest {
+ private Schema allTypesRecordSchema;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private String result;
+
+ @Before
+ public void setUp() {
+ result =
"[{\"name\":\"aBoolean\",\"type\":2,\"childColumns\":null},{\"name\":\"aInt\",\"type\":5,"
+ +
"\"childColumns\":null},{\"name\":\"aLong\",\"type\":6,\"childColumns\":null},{\"name\":\""
+ +
"aFloat\",\"type\":7,\"childColumns\":null},{\"name\":\"aDouble\",\"type\":8,\"childColumns\""
+ +
":null},{\"name\":\"aString\",\"type\":23,\"childColumns\":null},{\"name\":\"aBytes\",\"type\""
+ +
":11,\"childColumns\":null},{\"name\":\"aFixed\",\"type\":11,\"childColumns\":null},{\"name\""
+ +
":\"anArray\",\"type\":20,\"childColumns\":[{\"name\":null,\"type\":5,\"childColumns\":null}]}"
+ +
",{\"name\":\"aMap\",\"type\":21,\"childColumns\":[{\"name\":null,\"type\":23,\"childColumns\""
+ +
":null},{\"name\":null,\"type\":5,\"childColumns\":null}]},{\"name\":\"anEnum\",\"type\":23"
+ +
",\"childColumns\":null},{\"name\":\"aRecord\",\"type\":22,\"childColumns\":[{\"name\":\"a\","
+ +
"\"type\":5,\"childColumns\":null},{\"name\":\"b\",\"type\":8,\"childColumns\":null},{\"name\":"
+ +
"\"c\",\"type\":23,\"childColumns\":null}]},{\"name\":\"aUnion\",\"type\":22,\"childColumns\":"
+ +
"[{\"name\":\"string\",\"type\":23,\"childColumns\":null}]}]\n";
+
+ Schema simpleEnumSchema =
SchemaBuilder.enumeration("myEnumType").symbols("A", "B", "C");
+ Schema simpleRecordSchema = SchemaBuilder.record("simpleRecord")
+ .fields()
+ .name("a")
+ .type().intType().noDefault()
+ .name("b")
+ .type().doubleType().noDefault()
+ .name("c")
+ .type().stringType().noDefault()
+ .endRecord();
+
+ allTypesRecordSchema = SchemaBuilder.builder()
+ .record("all")
+ .fields()
+ .name("aBoolean")
+ .type().booleanType().noDefault()
+ .name("aInt")
+ .type().intType().noDefault()
+ .name("aLong")
+ .type().longType().noDefault()
+ .name("aFloat")
+ .type().floatType().noDefault()
+ .name("aDouble")
+ .type().doubleType().noDefault()
+ .name("aString")
+ .type().stringType().noDefault()
+ .name("aBytes")
+ .type().bytesType().noDefault()
+ .name("aFixed")
+ .type().fixed("myFixedType").size(16).noDefault()
+ .name("anArray")
+ .type().array().items().intType().noDefault()
+ .name("aMap")
+ .type().map().values().intType().noDefault()
+ .name("anEnum")
+ .type(simpleEnumSchema).noDefault()
+ .name("aRecord")
+ .type(simpleRecordSchema).noDefault()
+ .name("aUnion")
+ .type().optional().stringType()
+ .endRecord();
+ }
+
+ @Test
+ public void testParseTableSchema() throws IOException {
+ TableSchema tableSchema =
AvroTypeUtils.parseTableSchema(allTypesRecordSchema);
+ String tableSchemaTableSchema = tableSchema.getTableSchema();
+ JsonNode tableSchemaTree =
objectMapper.readTree(tableSchemaTableSchema);
+
+ JsonNode resultSchemaTree = objectMapper.readTree(result);
+ Assert.assertEquals(resultSchemaTree, tableSchemaTree);
+ }
+
+}
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java
index 421feb55a3f..9e223d0435f 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/vec/TableSchema.java
@@ -49,7 +49,7 @@ public class TableSchema {
public static class SchemaColumn {
private String name;
private int type;
- private SchemaColumn childColumn;
+ private List<SchemaColumn> childColumns;
public SchemaColumn() {
@@ -59,8 +59,8 @@ public class TableSchema {
return name;
}
- public SchemaColumn getChildColumn() {
- return childColumn;
+ public List<SchemaColumn> getChildColumns() {
+ return childColumns;
}
public int getType() {
@@ -75,8 +75,8 @@ public class TableSchema {
this.type = type.getValue();
}
- public void addChildColumn(SchemaColumn childColumn) {
- this.childColumn = childColumn;
+ public void addChildColumns(List<SchemaColumn> childColumns) {
+ this.childColumns = childColumns;
}
}
diff --git a/regression-test/data/external_table_p0/tvf/test_tvf_avro.out
b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out
new file mode 100644
index 00000000000..8f39bd410c9
--- /dev/null
+++ b/regression-test/data/external_table_p0/tvf/test_tvf_avro.out
@@ -0,0 +1,73 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !1 --
+aBoolean BOOLEAN Yes false \N NONE
+aInt INT Yes false \N NONE
+aLong BIGINT Yes false \N NONE
+aFloat FLOAT Yes false \N NONE
+aDouble DOUBLE Yes false \N NONE
+aString TEXT Yes false \N NONE
+anArray ARRAY<INT> Yes false \N NONE
+aMap MAP<TEXT,INT> Yes false \N NONE
+anEnum TEXT Yes false \N NONE
+aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE
+aUnion STRUCT<string:TEXT> Yes false \N NONE
+mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE
+arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N
NONE
+
+-- !2 --
+2
+
+-- !1 --
+false 100 9999 2.11 9.1102 string test [5, 6, 7]
{"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple
Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]}
[{"Key11":1}, {"Key22":0}]
+true 42 3400 3.14 9.81 a test string [1, 2, 3, 4]
{"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple
Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]}
[{"arrayMapKey1":0}, {"arrayMapKey2":1}]
+
+-- !2 --
+[1, 2, 3, 4]
+[5, 6, 7]
+
+-- !3 --
+{"k1":1, "k2":2}
+{"key1":1, "key2":2}
+
+-- !4 --
+A
+B
+
+-- !5 --
+{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"}
+{"a": 5, "b": 3.14159265358979, "c": "Simple Record String Field"}
+
+-- !6 --
+\N
+\N
+
+-- !7 --
+{"k1":[99, 88, 77], "k2":[10, 20]}
+{"k11":[77, 11, 33], "k22":[10, 20]}
+
+-- !8 --
+[{"Key11":1}, {"Key22":0}]
+[{"arrayMapKey1":0}, {"arrayMapKey2":1}]
+
+-- !3 --
+aBoolean BOOLEAN Yes false \N NONE
+aInt INT Yes false \N NONE
+aLong BIGINT Yes false \N NONE
+aFloat FLOAT Yes false \N NONE
+aDouble DOUBLE Yes false \N NONE
+aString TEXT Yes false \N NONE
+anArray ARRAY<INT> Yes false \N NONE
+aMap MAP<TEXT,INT> Yes false \N NONE
+anEnum TEXT Yes false \N NONE
+aRecord STRUCT<a:INT,b:DOUBLE,c:TEXT> Yes false \N NONE
+aUnion STRUCT<string:TEXT> Yes false \N NONE
+mapArrayLong MAP<TEXT,ARRAY<BIGINT>> Yes false \N NONE
+arrayMapBoolean ARRAY<MAP<TEXT,BOOLEAN>> Yes false \N
NONE
+
+-- !9 --
+false 100 9999 2.11 9.1102 string test [5, 6, 7]
{"k1":1, "k2":2} B {"a": 5, "b": 3.14159265358979, "c": "Simple
Record String Field"} \N {"k11":[77, 11, 33], "k22":[10, 20]}
[{"Key11":1}, {"Key22":0}]
+true 42 3400 3.14 9.81 a test string [1, 2, 3, 4]
{"key1":1, "key2":2} A {"a": 5, "b": 3.14159265358979, "c": "Simple
Record String Field"} \N {"k1":[99, 88, 77], "k2":[10, 20]}
[{"arrayMapKey1":0}, {"arrayMapKey2":1}]
+
+-- !4 --
+2
+
diff --git a/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy
b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy
new file mode 100644
index 00000000000..6f9b4f98b49
--- /dev/null
+++ b/regression-test/suites/external_table_p0/tvf/test_tvf_avro.groovy
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_tvf_avro", "external,hive,tvf,avro,external_docker") {
+
+ def all_type_file = "all_type.avro";
+ def format = "avro"
+
+ // s3 config
+ String ak = getS3AK()
+ String sk = getS3SK()
+ String s3_endpoint = getS3Endpoint()
+ String region = getS3Region()
+ String bucket = context.config.otherConfigs.get("s3BucketName");
+ def s3Uri =
"https://${bucket}.${s3_endpoint}/regression/datalake/pipeline_data/tvf/${all_type_file}";
+
+ // hdfs config
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def hdfsUserName = "doris"
+ def defaultFS = "hdfs://${externalEnvIp}:${hdfs_port}"
+ def hdfsUri = "${defaultFS}" +
"/user/doris/preinstalled_data/avro/avro_all_types/${all_type_file}"
+
+ // TVF s3()
+ qt_1 """
+ desc function s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ qt_2 """
+ select count(*) from s3(
+ "uri" ="${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_1 """
+ select * from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_2 """
+ select anArray from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_3 """
+ select aMap from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_4 """
+ select anEnum from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_5 """
+ select aRecord from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_6 """
+ select aUnion from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_7 """
+ select mapArrayLong from s3(
+ "uri" ="${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ order_qt_8 """
+ select arrayMapBoolean from s3(
+ "uri" = "${s3Uri}",
+ "ACCESS_KEY" = "${ak}",
+ "SECRET_KEY" = "${sk}",
+ "REGION" = "${region}",
+ "FORMAT" = "${format}");
+ """
+
+ // TVF hdfs()
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ qt_3 """
+ desc function HDFS(
+ "uri" = "${hdfsUri}",
+ "fs.defaultFS" = "${defaultFS}",
+ "hadoop.username" = "${hdfsUserName}",
+ "FORMAT" = "${format}"); """
+
+ order_qt_9 """ select * from HDFS(
+ "uri" = "${hdfsUri}",
+ "fs.defaultFS" = "${defaultFS}",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}")"""
+
+ qt_4 """ select count(*) from HDFS(
+ "uri" = "${hdfsUri}",
+ "fs.defaultFS" = "${defaultFS}",
+ "hadoop.username" = "${hdfsUserName}",
+ "format" = "${format}"); """
+ } finally {
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]