Thanks, Jing
Do you know, if this problem will be addressed in FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or it deserve a separate Jira? From: Jing Ge <j...@ververica.com> Date: Tuesday, November 15, 2022 at 3:39 PM To: Benenson, Michael <michael_benen...@intuit.com> Cc: user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar <omkar_deshpa...@intuit.com>, Vora, Jainik <jainik_v...@intuit.com> Subject: Re: Reading Parquet file with array of structs cause error This email is from an external sender. Hi Michael, Currently, ParquetColumnarRowInputFormat does not support schemas with nested columns. If your parquet file stores Avro records. You might want to try e.g. Avro Generic record[1]. [1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record Best regards, Jing On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <user@flink.apache.org<mailto:user@flink.apache.org>> wrote: Hi, folks I’m using flink 1.16.0, and I would like to read Parquet file (attached), that has schema [1]. I could read this file with Spark, but when I try to read it with Flink 1.16.0 (program attached) using schema [2] I got IndexOutOfBoundsException [3] My code, and parquet file are attached. Is it: • the problem, described in FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or • something new, that deserve a separate Jira, or • something wrong with my code? [1]: Parquet Schema root |-- amount: decimal(38,9) (nullable = true) |-- connectionAccountId: string (nullable = true) |-- sourceEntity: struct (nullable = true) | |-- extendedProperties: array (nullable = true) | | |-- element: struct (containsNull = true) | | | |-- key: string (nullable = true) | | | |-- value: string (nullable = true) | |-- sourceAccountId: string (nullable = true) | |-- sourceEntityId: string (nullable = true) | |-- sourceEntityType: string (nullable = true) | |-- sourceSystem: string (nullable = true) [2]: Schema used in Flink: static RowType getSchema() { RowType elementType = RowType.of( new LogicalType[] { new VarCharType(VarCharType.MAX_LENGTH), new VarCharType(VarCharType.MAX_LENGTH) }, new String[] { "key", "value" } ); RowType element = RowType.of( new LogicalType[] { elementType }, new String[] { "element" } ); RowType sourceEntity = RowType.of( new LogicalType[] { new ArrayType(element), new VarCharType(), new VarCharType(), new VarCharType(), new VarCharType(), }, new String[] { "extendedProperties", "sourceAccountId", "sourceEntityId", "sourceEntityType", "sourceSystem" } ); return RowType.of( new LogicalType[] { new DecimalType(), new VarCharType(), sourceEntity }, new String[] { "amount", "connectionAccountId", "sourceEntity", }); } [3]: Execution Exception: 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ... Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) at java.base/java.util.Objects.checkIndex(Objects.java:372) at java.base/java.util.ArrayList.get(ArrayList.java:459) at org.apache.parquet.schema.GroupType.getType(GroupType.java:216) at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) at org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281) at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270) at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260) at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143) at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ... 6 common frames omitted Thanks