Hi Michael, yeah, it will be addressed in Flink-28867.
Best regards, Jing On Wed, Nov 16, 2022 at 2:58 AM liu ron <ron9....@gmail.com> wrote: > It will be addressed in FLINK-28867. > > Best, > Ron > > Benenson, Michael via user <user@flink.apache.org> 于2022年11月16日周三 08:47写道: > >> Thanks, Jing >> >> >> >> Do you know, if this problem will be addressed in FLINK-28867 >> <https://issues.apache.org/jira/browse/FLINK-28867> or it deserve a >> separate Jira? >> >> >> >> >> >> *From: *Jing Ge <j...@ververica.com> >> *Date: *Tuesday, November 15, 2022 at 3:39 PM >> *To: *Benenson, Michael <michael_benen...@intuit.com> >> *Cc: *user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar < >> omkar_deshpa...@intuit.com>, Vora, Jainik <jainik_v...@intuit.com> >> *Subject: *Re: Reading Parquet file with array of structs cause error >> >> This email is from an external sender. >> >> >> >> Hi Michael, >> >> >> >> Currently, ParquetColumnarRowInputFormat does not support schemas with >> nested columns. If your parquet file stores Avro records. You might want to >> try e.g. Avro Generic record[1]. >> >> >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record >> >> >> >> Best regards, >> >> Jing >> >> >> >> >> >> On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user < >> user@flink.apache.org> wrote: >> >> Hi, folks >> >> >> >> I’m using flink 1.16.0, and I would like to read Parquet file (attached), >> that has schema [1]. >> >> >> >> I could read this file with Spark, but when I try to read it with Flink >> 1.16.0 (program attached) using schema [2] >> >> I got IndexOutOfBoundsException [3] >> >> >> >> My code, and parquet file are attached. Is it: >> >> · the problem, described in FLINK-28867 >> <https://issues.apache.org/jira/browse/FLINK-28867> or >> >> · something new, that deserve a separate Jira, or >> >> · something wrong with my code? >> >> >> >> [1]: Parquet Schema >> >> >> >> root >> >> |-- amount: decimal(38,9) (nullable = true) >> >> |-- connectionAccountId: string (nullable = true) >> >> |-- sourceEntity: struct (nullable = true) >> >> | |-- extendedProperties: array (nullable = true) >> >> | | |-- element: struct (containsNull = true) >> >> | | | |-- key: string (nullable = true) >> >> | | | |-- value: string (nullable = true) >> >> | |-- sourceAccountId: string (nullable = true) >> >> | |-- sourceEntityId: string (nullable = true) >> >> | |-- sourceEntityType: string (nullable = true) >> >> | |-- sourceSystem: string (nullable = true) >> >> >> >> >> >> [2]: Schema used in Flink: >> >> >> >> static RowType getSchema() >> >> { >> >> RowType elementType = RowType.of( >> >> new LogicalType[] { >> >> new VarCharType(VarCharType.MAX_LENGTH), >> >> new VarCharType(VarCharType.MAX_LENGTH) >> >> }, >> >> new String[] { >> >> "key", >> >> "value" >> >> } >> >> ); >> >> >> >> RowType element = RowType.of( >> >> new LogicalType[] { elementType }, >> >> new String[] { "element" } >> >> ); >> >> >> >> RowType sourceEntity = RowType.of( >> >> new LogicalType[] { >> >> new ArrayType(element), >> >> new VarCharType(), >> >> new VarCharType(), >> >> new VarCharType(), >> >> new VarCharType(), >> >> }, >> >> new String[] { >> >> "extendedProperties", >> >> "sourceAccountId", >> >> "sourceEntityId", >> >> "sourceEntityType", >> >> "sourceSystem" >> >> } >> >> ); >> >> >> >> return RowType.of( >> >> new LogicalType[] { >> >> new DecimalType(), >> >> new VarCharType(), >> >> sourceEntity >> >> }, >> >> new String[] { >> >> "amount", >> >> "connectionAccountId", >> >> "sourceEntity", >> >> }); >> >> } >> >> >> >> [3]: Execution Exception: >> >> >> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - >> Received uncaught exception. >> >> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected >> exception while polling the records >> >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) >> >> ... >> >> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for >> length 1 >> >> at >> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64) >> >> at >> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70) >> >> at >> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248) >> >> at java.base/java.util.Objects.checkIndex(Objects.java:372) >> >> at java.base/java.util.ArrayList.get(ArrayList.java:459) >> >> at org.apache.parquet.schema.GroupType.getType(GroupType.java:216) >> >> at >> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536) >> >> at >> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) >> >> at >> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503) >> >> at >> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533) >> >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281) >> >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270) >> >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260) >> >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143) >> >> at >> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77) >> >> at >> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112) >> >> at >> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65) >> >> at >> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) >> >> at >> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) >> >> ... 6 common frames omitted >> >> >> >> Thanks >> >> >> >>