Hi Michael, Currently, ParquetColumnarRowInputFormat does not support schemas with nested columns. If your parquet file stores Avro records. You might want to try e.g. Avro Generic record[1].
[1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record Best regards, Jing On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user < user@flink.apache.org> wrote: > Hi, folks > > > > I’m using flink 1.16.0, and I would like to read Parquet file (attached), > that has schema [1]. > > > > I could read this file with Spark, but when I try to read it with Flink > 1.16.0 (program attached) using schema [2] > > I got IndexOutOfBoundsException [3] > > > > My code, and parquet file are attached. Is it: > > · the problem, described in FLINK-28867 > <https://issues.apache.org/jira/browse/FLINK-28867> or > > · something new, that deserve a separate Jira, or > > · something wrong with my code? > > > > [1]: Parquet Schema > > > > root > > |-- amount: decimal(38,9) (nullable = true) > > |-- connectionAccountId: string (nullable = true) > > |-- sourceEntity: struct (nullable = true) > > | |-- extendedProperties: array (nullable = true) > > | | |-- element: struct (containsNull = true) > > | | | |-- key: string (nullable = true) > > | | | |-- value: string (nullable = true) > > | |-- sourceAccountId: string (nullable = true) > > | |-- sourceEntityId: string (nullable = true) > > | |-- sourceEntityType: string (nullable = true) > > | |-- sourceSystem: string (nullable = true) > > > > > > [2]: Schema used in Flink: > > > > static RowType getSchema() > > { > > RowType elementType = RowType.of( > > new LogicalType[] { > > new VarCharType(VarCharType.MAX_LENGTH), > > new VarCharType(VarCharType.MAX_LENGTH) > > }, > > new String[] { > > "key", > > "value" > > } > > ); > > > > RowType element = RowType.of( > > new LogicalType[] { elementType }, > > new String[] { "element" } > > ); > > > > RowType sourceEntity = RowType.of( > > new LogicalType[] { > > new ArrayType(element), > > new VarCharType(), > > new VarCharType(), > > new VarCharType(), > > new VarCharType(), > > }, > > new String[] { > > "extendedProperties", > > "sourceAccountId", > > "sourceEntityId", > > "sourceEntityType", > > "sourceSystem" > > } > > ); > > > > return RowType.of( > > new LogicalType[] { > > new DecimalType(), > > new VarCharType(), > > sourceEntity > > }, > > new String[] { > > "amount", > > "connectionAccountId", > > "sourceEntity", > > }); > > } > > > > [3]: Execution Exception: > > > 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - > Received uncaught exception. > > java.lang.RuntimeException: SplitFetcher thread 0 received unexpected > exception while polling the records > > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) > > ... > > Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for > length 1 > > at > java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) > > at > java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) > > at > java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) > > at java.base/java.util.Objects.checkIndex(Objects.java:372) > > at java.base/java.util.ArrayList.get(ArrayList.java:459) > > at org.apache.parquet.schema.GroupType.getType(GroupType.java:216) > > at > org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) > > at > org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) > > at > org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) > > at > org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) > > at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281) > > at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270) > > at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260) > > at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143) > > at > org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77) > > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) > > at > org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) > > at > org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) > > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) > > ... 6 common frames omitted > > > > Thanks > > >