Hi Michael,

yeah, it will be addressed in Flink-28867.

Best regards,
Jing


On Wed, Nov 16, 2022 at 2:58 AM liu ron <ron9....@gmail.com> wrote:

> It will be addressed in FLINK-28867.
>
> Best,
> Ron
>
> Benenson, Michael via user <user@flink.apache.org> 于2022年11月16日周三 08:47写道:
>
>> Thanks, Jing
>>
>>
>>
>> Do you know, if this problem will be addressed in FLINK-28867
>> <https://issues.apache.org/jira/browse/FLINK-28867> or  it deserve a
>> separate Jira?
>>
>>
>>
>>
>>
>> *From: *Jing Ge <j...@ververica.com>
>> *Date: *Tuesday, November 15, 2022 at 3:39 PM
>> *To: *Benenson, Michael <michael_benen...@intuit.com>
>> *Cc: *user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar <
>> omkar_deshpa...@intuit.com>, Vora, Jainik <jainik_v...@intuit.com>
>> *Subject: *Re: Reading Parquet file with array of structs cause error
>>
>> This email is from an external sender.
>>
>>
>>
>> Hi Michael,
>>
>>
>>
>> Currently, ParquetColumnarRowInputFormat does not support schemas with
>> nested columns. If your parquet file stores Avro records. You might want to
>> try e.g. Avro Generic record[1].
>>
>>
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record
>>
>>
>>
>> Best regards,
>>
>> Jing
>>
>>
>>
>>
>>
>> On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user <
>> user@flink.apache.org> wrote:
>>
>> Hi, folks
>>
>>
>>
>> I’m using flink 1.16.0, and I would like to read Parquet file (attached),
>> that has schema [1].
>>
>>
>>
>> I could read this file with Spark, but when I try to read it with Flink
>> 1.16.0 (program attached) using schema [2]
>>
>> I got IndexOutOfBoundsException [3]
>>
>>
>>
>> My code, and parquet file are attached. Is it:
>>
>> ·         the problem, described in FLINK-28867
>> <https://issues.apache.org/jira/browse/FLINK-28867> or
>>
>> ·         something new, that deserve a separate Jira, or
>>
>> ·         something wrong with my code?
>>
>>
>>
>> [1]: Parquet Schema
>>
>>
>>
>> root
>>
>> |-- amount: decimal(38,9) (nullable = true)
>>
>> |-- connectionAccountId: string (nullable = true)
>>
>> |-- sourceEntity: struct (nullable = true)
>>
>> |    |-- extendedProperties: array (nullable = true)
>>
>> |    |    |-- element: struct (containsNull = true)
>>
>> |    |    |    |-- key: string (nullable = true)
>>
>> |    |    |    |-- value: string (nullable = true)
>>
>> |    |-- sourceAccountId: string (nullable = true)
>>
>> |    |-- sourceEntityId: string (nullable = true)
>>
>> |    |-- sourceEntityType: string (nullable = true)
>>
>> |    |-- sourceSystem: string (nullable = true)
>>
>>
>>
>>
>>
>> [2]: Schema used in Flink:
>>
>>
>>
>>     static RowType getSchema()
>>
>>     {
>>
>>         RowType elementType = RowType.of(
>>
>>             new LogicalType[] {
>>
>>                 new VarCharType(VarCharType.MAX_LENGTH),
>>
>>                 new VarCharType(VarCharType.MAX_LENGTH)
>>
>>             },
>>
>>             new String[] {
>>
>>                 "key",
>>
>>                 "value"
>>
>>             }
>>
>>         );
>>
>>
>>
>>         RowType element = RowType.of(
>>
>>             new LogicalType[] { elementType },
>>
>>             new String[] { "element" }
>>
>>         );
>>
>>
>>
>>         RowType sourceEntity = RowType.of(
>>
>>             new LogicalType[] {
>>
>>                 new ArrayType(element),
>>
>>                 new VarCharType(),
>>
>>                 new VarCharType(),
>>
>>                 new VarCharType(),
>>
>>                 new VarCharType(),
>>
>>             },
>>
>>             new String[] {
>>
>>                 "extendedProperties",
>>
>>                 "sourceAccountId",
>>
>>                 "sourceEntityId",
>>
>>                 "sourceEntityType",
>>
>>                 "sourceSystem"
>>
>>             }
>>
>>         );
>>
>>
>>
>>         return  RowType.of(
>>
>>             new LogicalType[] {
>>
>>                 new DecimalType(),
>>
>>                 new VarCharType(),
>>
>>                 sourceEntity
>>
>>             },
>>
>>             new String[] {
>>
>>                 "amount",
>>
>>                 "connectionAccountId",
>>
>>                 "sourceEntity",
>>
>>         });
>>
>>     }
>>
>>
>>
>> [3]:  Execution Exception:
>>
>>
>> 2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager -
>> Received uncaught exception.
>>
>> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
>> exception while polling the records
>>
>>     at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>>
>>     ...
>>
>> Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for
>> length 1
>>
>>     at
>> java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
>>
>>     at
>> java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
>>
>>     at
>> java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
>>
>>     at java.base/java.util.Objects.checkIndex(Objects.java:372)
>>
>>     at java.base/java.util.ArrayList.get(ArrayList.java:459)
>>
>>     at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
>>
>>     at
>> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
>>
>>     at
>> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>>
>>     at
>> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
>>
>>     at
>> org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
>>
>>     at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
>>
>>     at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
>>
>>     at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
>>
>>     at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
>>
>>     at
>> org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
>>
>>     at
>> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
>>
>>     at
>> org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
>>
>>     at
>> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>>
>>     at
>> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>>
>>     ... 6 common frames omitted
>>
>>
>>
>> Thanks
>>
>>
>>
>>

Reply via email to