luoyuxia commented on code in PR #20442:
URL: https://github.com/apache/flink/pull/20442#discussion_r938482274


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java:
##########
@@ -32,18 +32,19 @@
  */
 public class ParquetDecimalVector implements DecimalColumnVector {
 
-    private final ColumnVector vector;
+    public final ColumnVector vector;
 
     public ParquetDecimalVector(ColumnVector vector) {
         this.vector = vector;
     }
 
     @Override
     public DecimalData getDecimal(int i, int precision, int scale) {
-        if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+        if (ParquetSchemaConverter.is32BitDecimal(precision) && vector 
instanceof IntColumnVector) {
             return DecimalData.fromUnscaledLong(
                     ((IntColumnVector) vector).getInt(i), precision, scale);
-        } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+        } else if (ParquetSchemaConverter.is64BitDecimal(precision)
+                && vector instanceof LongColumnVector) {
             return DecimalData.fromUnscaledLong(
                     ((LongColumnVector) vector).getLong(i), precision, scale);
         } else {

Review Comment:
   Should we also check `vector instance of BytesColumnVector` ?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java:
##########
@@ -396,6 +490,48 @@ public static WritableColumnVector 
createWritableColumnVector(
                             typeName);
                     return new HeapBytesVector(batchSize);
                 }
+            case ARRAY:

Review Comment:
   also try to `checkArgument` in here?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ParquetDataColumnReaderFactory.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Parquet file has self-describing schema which may differ from the user 
required schema (e.g.
+ * schema evolution). This factory is used to retrieve user required typed 
data via corresponding
+ * reader which reads the underlying data.
+ */
+public final class ParquetDataColumnReaderFactory {
+
+    private ParquetDataColumnReaderFactory() {}
+
+    /** default reader for {@link ParquetDataColumnReader}. */
+    public static class DefaultParquetDataColumnReader implements 
ParquetDataColumnReader {

Review Comment:
   This class mix Dictionary reader and ValuesReader, which cause many warning 
like that`Method invocation 'readBoolean' may produce 'NullPointerException`  
in idea 
   Would it be better to split it into two class, one for ValuesReader and one 
for Dictionary reader?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ArrayColumnReader.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.ParquetDecimalVector;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.columnar.vector.heap.HeapArrayVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Array {@link ColumnReader}. */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+    // The value read in last time
+    private Object lastValue;
+
+    // flag to indicate if there is no data in parquet data page
+    private boolean eof = false;
+
+    // flag to indicate if it's the first time to read parquet data page with 
this instance
+    boolean isFirstRow = true;
+
+    public ArrayColumnReader(
+            ColumnDescriptor descriptor,
+            PageReader pageReader,
+            boolean isUtcTimestamp,
+            Type type,
+            LogicalType logicalType)
+            throws IOException {
+        super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+    }
+
+    @Override
+    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
+        HeapArrayVector lcv = (HeapArrayVector) vector;
+        // before readBatch, initial the size of offsets & lengths as the 
default value,
+        // the actual size will be assigned in setChildrenInfo() after reading 
complete.
+        lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+        lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+
+        LogicalType elementType = ((ArrayType) logicalType).getElementType();
+
+        // read the first row in parquet data page, this will be only happened 
once for this
+        // instance
+        if (isFirstRow) {
+            if (!fetchNextValue(elementType)) {
+                return;
+            }
+            isFirstRow = false;
+        }
+
+        // Because the length of ListColumnVector.child can't be known now,
+        // the valueList will save all data for ListColumnVector temporary.
+        List<Object> valueList = new ArrayList<>();
+
+        int index = collectDataFromParquetPage(readNumber, lcv, valueList, 
elementType);
+        // Convert valueList to array for the ListColumnVector.child
+        fillColumnVector(elementType, lcv, valueList, index);
+    }
+
+    /**
+     * Reads a single value from parquet page, puts it into lastValue. Returns 
a boolean indicating
+     * if there is more values to read (true).
+     *
+     * @param type the element type of array
+     * @return boolean
+     * @throws IOException
+     */
+    private boolean fetchNextValue(LogicalType type) throws IOException {
+        int left = readPageIfNeed();
+        if (left > 0) {
+            // get the values of repetition and definitionLevel
+            readRepetitionAndDefinitionLevels();
+            // read the data if it isn't null
+            if (definitionLevel == maxDefLevel) {
+                if (isCurrentPageDictionaryEncoded) {
+                    lastValue = dataColumn.readValueDictionaryId();
+                } else {
+                    lastValue = readPrimitiveTypedRow(type);
+                }
+            } else {
+                lastValue = null;
+            }
+            return true;
+        } else {
+            eof = true;
+            return false;
+        }
+    }
+
+    private int readPageIfNeed() throws IOException {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int) (endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+            // no data left in current page, load data from new page
+            readPage();
+            leftInPage = (int) (endOfPageValueCount - valuesRead);
+        }
+        return leftInPage;
+    }
+
+    // Need to be in consistent with that 
VectorizedPrimitiveColumnReader#readBatchHelper
+    // TODO Reduce the duplicated code
+    private Object readPrimitiveTypedRow(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return dataColumn.readString();
+            case BOOLEAN:
+                return dataColumn.readBoolean();
+            case TIME_WITHOUT_TIME_ZONE:
+            case DATE:
+            case INTEGER:
+                return dataColumn.readInteger();
+            case TINYINT:
+                return dataColumn.readTinyInt();
+            case SMALLINT:
+                return dataColumn.readSmallInt();
+            case BIGINT:
+                return dataColumn.readLong();
+            case FLOAT:
+                return dataColumn.readFloat();
+            case DOUBLE:
+                return dataColumn.readDouble();
+            case DECIMAL:
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return dataColumn.readInteger();
+                    case INT64:
+                        return dataColumn.readLong();
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                        return dataColumn.readString();
+                }
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return dataColumn.readTimestamp();
+            default:
+                throw new RuntimeException("Unsupported type in the list: " + 
type);
+        }
+    }
+
+    private Object dictionaryDecodeValue(LogicalType type, Integer 
dictionaryValue) {
+        if (dictionaryValue == null) {
+            return null;
+        }
+
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return dictionary.readString(dictionaryValue);
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTEGER:
+                return dictionary.readInteger(dictionaryValue);
+            case BOOLEAN:
+                return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+            case DOUBLE:
+                return dictionary.readDouble(dictionaryValue);
+            case FLOAT:
+                return dictionary.readFloat(dictionaryValue);
+            case TINYINT:
+                return dictionary.readTinyInt(dictionaryValue);
+            case SMALLINT:
+                return dictionary.readSmallInt(dictionaryValue);
+            case BIGINT:
+                return dictionary.readLong(dictionaryValue);
+            case DECIMAL:
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return dictionary.readInteger(dictionaryValue);
+                    case INT64:
+                        return dictionary.readLong(dictionaryValue);
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case BINARY:
+                        return dictionary.readString(dictionaryValue);

Review Comment:
   dito



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java:
##########
@@ -257,56 +271,136 @@ public static ColumnVector createVectorFromConstant(
         }
     }
 
+    private static List<ColumnDescriptor> getAllColumnDescriptorByType(
+            int depth, Type type, List<ColumnDescriptor> columns) throws 
ParquetRuntimeException {
+        List<ColumnDescriptor> res = new ArrayList<>();
+        for (ColumnDescriptor descriptor : columns) {
+            if (depth >= descriptor.getPath().length) {
+                throw new InvalidSchemaException("Corrupted Parquet schema");
+            }
+            if (type.getName().equals(descriptor.getPath()[depth])) {
+                res.add(descriptor);
+            }
+        }
+        return res;
+    }
+
     public static ColumnReader createColumnReader(
-            boolean utcTimestamp,
+            boolean isUtcTimestamp,
             LogicalType fieldType,
-            ColumnDescriptor descriptor,
-            PageReader pageReader)
+            Type type,
+            List<ColumnDescriptor> columnDescriptors,
+            PageReadStore pages,
+            int depth)
             throws IOException {
+        List<ColumnDescriptor> descriptors =
+                getAllColumnDescriptorByType(depth, type, columnDescriptors);

Review Comment:
   Check the length of `descriptors`?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ArrayColumnReader.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.vector.reader;
+
+import org.apache.flink.formats.parquet.vector.ParquetDecimalVector;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.columnar.vector.heap.HeapArrayVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/** Array {@link ColumnReader}. */
+public class ArrayColumnReader extends BaseVectorizedColumnReader {
+
+    // The value read in last time
+    private Object lastValue;
+
+    // flag to indicate if there is no data in parquet data page
+    private boolean eof = false;
+
+    // flag to indicate if it's the first time to read parquet data page with 
this instance
+    boolean isFirstRow = true;
+
+    public ArrayColumnReader(
+            ColumnDescriptor descriptor,
+            PageReader pageReader,
+            boolean isUtcTimestamp,
+            Type type,
+            LogicalType logicalType)
+            throws IOException {
+        super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
+    }
+
+    @Override
+    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
+        HeapArrayVector lcv = (HeapArrayVector) vector;
+        // before readBatch, initial the size of offsets & lengths as the 
default value,
+        // the actual size will be assigned in setChildrenInfo() after reading 
complete.
+        lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+        lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
+
+        LogicalType elementType = ((ArrayType) logicalType).getElementType();
+
+        // read the first row in parquet data page, this will be only happened 
once for this
+        // instance
+        if (isFirstRow) {
+            if (!fetchNextValue(elementType)) {
+                return;
+            }
+            isFirstRow = false;
+        }
+
+        // Because the length of ListColumnVector.child can't be known now,
+        // the valueList will save all data for ListColumnVector temporary.
+        List<Object> valueList = new ArrayList<>();
+
+        int index = collectDataFromParquetPage(readNumber, lcv, valueList, 
elementType);
+        // Convert valueList to array for the ListColumnVector.child
+        fillColumnVector(elementType, lcv, valueList, index);
+    }
+
+    /**
+     * Reads a single value from parquet page, puts it into lastValue. Returns 
a boolean indicating
+     * if there is more values to read (true).
+     *
+     * @param type the element type of array
+     * @return boolean
+     * @throws IOException
+     */
+    private boolean fetchNextValue(LogicalType type) throws IOException {
+        int left = readPageIfNeed();
+        if (left > 0) {
+            // get the values of repetition and definitionLevel
+            readRepetitionAndDefinitionLevels();
+            // read the data if it isn't null
+            if (definitionLevel == maxDefLevel) {
+                if (isCurrentPageDictionaryEncoded) {
+                    lastValue = dataColumn.readValueDictionaryId();
+                } else {
+                    lastValue = readPrimitiveTypedRow(type);
+                }
+            } else {
+                lastValue = null;
+            }
+            return true;
+        } else {
+            eof = true;
+            return false;
+        }
+    }
+
+    private int readPageIfNeed() throws IOException {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int) (endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+            // no data left in current page, load data from new page
+            readPage();
+            leftInPage = (int) (endOfPageValueCount - valuesRead);
+        }
+        return leftInPage;
+    }
+
+    // Need to be in consistent with that 
VectorizedPrimitiveColumnReader#readBatchHelper
+    // TODO Reduce the duplicated code
+    private Object readPrimitiveTypedRow(LogicalType type) {
+        switch (type.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return dataColumn.readString();
+            case BOOLEAN:
+                return dataColumn.readBoolean();
+            case TIME_WITHOUT_TIME_ZONE:
+            case DATE:
+            case INTEGER:
+                return dataColumn.readInteger();
+            case TINYINT:
+                return dataColumn.readTinyInt();
+            case SMALLINT:
+                return dataColumn.readSmallInt();
+            case BIGINT:
+                return dataColumn.readLong();
+            case FLOAT:
+                return dataColumn.readFloat();
+            case DOUBLE:
+                return dataColumn.readDouble();
+            case DECIMAL:
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return dataColumn.readInteger();
+                    case INT64:
+                        return dataColumn.readLong();
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                        return dataColumn.readString();

Review Comment:
   dataColumn.readBytes()?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ParquetDataColumnReader.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.parquet.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.column.Dictionary;
+
+import java.io.IOException;
+
+/**
+ * The interface to wrap the underlying Parquet dictionary and non dictionary 
encoded page reader.
+ */
+public interface ParquetDataColumnReader {
+
+    /**
+     * Initialize the reader by page data.
+     *
+     * @param valueCount value count
+     * @param in page data
+     * @throws IOException
+     */
+    void initFromPage(int valueCount, ByteBufferInputStream in) throws 
IOException;
+
+    /** @return the next Dictionary ID from the page */
+    int readValueDictionaryId();
+
+    /** @return the next Long from the page */
+    long readLong();
+
+    /** @return the next Integer from the page */
+    int readInteger();
+
+    /** @return the next SmallInt from the page */
+    int readSmallInt();
+
+    /** @return the next TinyInt from the page */
+    int readTinyInt();
+
+    /** @return the next Float from the page */
+    float readFloat();
+
+    /** @return the next Boolean from the page */
+    boolean readBoolean();
+
+    /** @return the next String from the page */
+    byte[] readString();

Review Comment:
   String readString()?



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -85,45 +91,43 @@ public class ParquetColumnarRowInputFormatTest {
     private static final org.apache.flink.configuration.Configuration 
EMPTY_CONF =
             new org.apache.flink.configuration.Configuration();
 
-    private static final MessageType PARQUET_SCHEMA =
-            new MessageType(
-                    "TOP",
-                    primitive(PrimitiveTypeName.BINARY, 
Repetition.OPTIONAL).named("f0"),
-                    primitive(PrimitiveTypeName.BOOLEAN, 
Repetition.OPTIONAL).named("f1"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f2"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f3"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f4"),
-                    primitive(PrimitiveTypeName.INT64, 
Repetition.OPTIONAL).named("f5"),
-                    primitive(PrimitiveTypeName.FLOAT, 
Repetition.OPTIONAL).named("f6"),
-                    primitive(PrimitiveTypeName.DOUBLE, 
Repetition.OPTIONAL).named("f7"),
-                    primitive(PrimitiveTypeName.INT96, 
Repetition.OPTIONAL).named("f8"),
-                    primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
-                            .precision(5)
-                            .as(OriginalType.DECIMAL)
-                            .named("f9"),
-                    primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
-                            .precision(15)
-                            .as(OriginalType.DECIMAL)
-                            .named("f10"),
-                    primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL)
-                            .precision(20)
-                            .as(OriginalType.DECIMAL)
-                            .named("f11"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(5)
-                            .as(OriginalType.DECIMAL)
-                            .named("f12"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(15)
-                            .as(OriginalType.DECIMAL)
-                            .named("f13"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(20)
-                            .as(OriginalType.DECIMAL)
-                            .named("f14"));
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new VarCharType(VarCharType.MAX_LENGTH),
+                    new BooleanType(),
+                    new TinyIntType(),
+                    new SmallIntType(),
+                    new IntType(),
+                    new BigIntType(),
+                    new FloatType(),
+                    new DoubleType(),
+                    new TimestampType(9),
+                    new DecimalType(5, 0),

Review Comment:
   also test DecimalType with scale like DecimalType(5, 2)?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/columnar/vector/heap/AbstractHeapVector.java:
##########
@@ -48,8 +48,11 @@ public abstract class AbstractHeapVector extends 
AbstractWritableVector {
     /** Reusable column for ids of dictionary. */
     protected HeapIntVector dictionaryIds;
 
+    private int len;

Review Comment:
   nit:
   ```suggestion
       private final int len;
   ```



##########
flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetColumnarRowInputFormatTest.java:
##########
@@ -85,45 +91,43 @@ public class ParquetColumnarRowInputFormatTest {
     private static final org.apache.flink.configuration.Configuration 
EMPTY_CONF =
             new org.apache.flink.configuration.Configuration();
 
-    private static final MessageType PARQUET_SCHEMA =
-            new MessageType(
-                    "TOP",
-                    primitive(PrimitiveTypeName.BINARY, 
Repetition.OPTIONAL).named("f0"),
-                    primitive(PrimitiveTypeName.BOOLEAN, 
Repetition.OPTIONAL).named("f1"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f2"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f3"),
-                    primitive(PrimitiveTypeName.INT32, 
Repetition.OPTIONAL).named("f4"),
-                    primitive(PrimitiveTypeName.INT64, 
Repetition.OPTIONAL).named("f5"),
-                    primitive(PrimitiveTypeName.FLOAT, 
Repetition.OPTIONAL).named("f6"),
-                    primitive(PrimitiveTypeName.DOUBLE, 
Repetition.OPTIONAL).named("f7"),
-                    primitive(PrimitiveTypeName.INT96, 
Repetition.OPTIONAL).named("f8"),
-                    primitive(PrimitiveTypeName.INT32, Repetition.OPTIONAL)
-                            .precision(5)
-                            .as(OriginalType.DECIMAL)
-                            .named("f9"),
-                    primitive(PrimitiveTypeName.INT64, Repetition.OPTIONAL)
-                            .precision(15)
-                            .as(OriginalType.DECIMAL)
-                            .named("f10"),
-                    primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL)
-                            .precision(20)
-                            .as(OriginalType.DECIMAL)
-                            .named("f11"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(5)
-                            .as(OriginalType.DECIMAL)
-                            .named("f12"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(15)
-                            .as(OriginalType.DECIMAL)
-                            .named("f13"),
-                    primitive(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 
Repetition.OPTIONAL)
-                            .length(16)
-                            .precision(20)
-                            .as(OriginalType.DECIMAL)
-                            .named("f14"));
+    private static final RowType ROW_TYPE =
+            RowType.of(
+                    new VarCharType(VarCharType.MAX_LENGTH),
+                    new BooleanType(),
+                    new TinyIntType(),
+                    new SmallIntType(),
+                    new IntType(),
+                    new BigIntType(),
+                    new FloatType(),
+                    new DoubleType(),
+                    new TimestampType(9),
+                    new DecimalType(5, 0),
+                    new DecimalType(15, 0),
+                    new DecimalType(20, 0),
+                    new DecimalType(5, 0),
+                    new DecimalType(15, 0),
+                    new DecimalType(20, 0),
+                    new ArrayType(new VarCharType(VarCharType.MAX_LENGTH)),
+                    new ArrayType(new BooleanType()),
+                    new ArrayType(new TinyIntType()),
+                    new ArrayType(new SmallIntType()),
+                    new ArrayType(new IntType()),
+                    new ArrayType(new BigIntType()),
+                    new ArrayType(new FloatType()),
+                    new ArrayType(new DoubleType()),

Review Comment:
   Add a test like `new ArrayType(new ArrayType(new DoubleType()))`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to