Thanks, Jing

Do you know, if this problem will be addressed in 
FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or  it deserve a 
separate Jira?


From: Jing Ge <j...@ververica.com>
Date: Tuesday, November 15, 2022 at 3:39 PM
To: Benenson, Michael <michael_benen...@intuit.com>
Cc: user@flink.apache.org <user@flink.apache.org>, Deshpande, Omkar 
<omkar_deshpa...@intuit.com>, Vora, Jainik <jainik_v...@intuit.com>
Subject: Re: Reading Parquet file with array of structs cause error
This email is from an external sender.

Hi Michael,

Currently, ParquetColumnarRowInputFormat does not support schemas with nested 
columns. If your parquet file stores Avro records. You might want to try e.g. 
Avro Generic record[1].

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/formats/parquet/#generic-record

Best regards,
Jing


On Tue, Nov 15, 2022 at 8:52 PM Benenson, Michael via user 
<user@flink.apache.org<mailto:user@flink.apache.org>> wrote:
Hi, folks

I’m using flink 1.16.0, and I would like to read Parquet file (attached), that 
has schema [1].

I could read this file with Spark, but when I try to read it with Flink 1.16.0 
(program attached) using schema [2]
I got IndexOutOfBoundsException [3]

My code, and parquet file are attached. Is it:

•         the problem, described in 
FLINK-28867<https://issues.apache.org/jira/browse/FLINK-28867> or

•         something new, that deserve a separate Jira, or

•         something wrong with my code?

[1]: Parquet Schema

root
|-- amount: decimal(38,9) (nullable = true)
|-- connectionAccountId: string (nullable = true)
|-- sourceEntity: struct (nullable = true)
|    |-- extendedProperties: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- key: string (nullable = true)
|    |    |    |-- value: string (nullable = true)
|    |-- sourceAccountId: string (nullable = true)
|    |-- sourceEntityId: string (nullable = true)
|    |-- sourceEntityType: string (nullable = true)
|    |-- sourceSystem: string (nullable = true)


[2]: Schema used in Flink:

    static RowType getSchema()
    {
        RowType elementType = RowType.of(
            new LogicalType[] {
                new VarCharType(VarCharType.MAX_LENGTH),
                new VarCharType(VarCharType.MAX_LENGTH)
            },
            new String[] {
                "key",
                "value"
            }
        );

        RowType element = RowType.of(
            new LogicalType[] { elementType },
            new String[] { "element" }
        );

        RowType sourceEntity = RowType.of(
            new LogicalType[] {
                new ArrayType(element),
                new VarCharType(),
                new VarCharType(),
                new VarCharType(),
                new VarCharType(),
            },
            new String[] {
                "extendedProperties",
                "sourceAccountId",
                "sourceEntityId",
                "sourceEntityType",
                "sourceSystem"
            }
        );

        return  RowType.of(
            new LogicalType[] {
                new DecimalType(),
                new VarCharType(),
                sourceEntity
            },
            new String[] {
                "amount",
                "connectionAccountId",
                "sourceEntity",
        });
    }

[3]:  Execution Exception:

2022/11/15 11:39:58.657 ERROR o.a.f.c.b.s.r.f.SplitFetcherManager - Received 
uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception 
while polling the records
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
    ...
Caused by: java.lang.IndexOutOfBoundsException: Index 1 out of bounds for 
length 1
    at 
java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:64)
    at 
java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:70)
    at 
java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:248)
    at java.base/java.util.Objects.checkIndex(Objects.java:372)
    at java.base/java.util.ArrayList.get(ArrayList.java:459)
    at org.apache.parquet.schema.GroupType.getType(GroupType.java:216)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:536)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:503)
    at 
org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil.createWritableColumnVector(ParquetSplitReaderUtil.java:533)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createWritableVectors(ParquetVectorizedInputFormat.java:281)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReaderBatch(ParquetVectorizedInputFormat.java:270)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createPoolOfBatches(ParquetVectorizedInputFormat.java:260)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:143)
    at 
org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:77)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
    at 
org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
    at 
org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    ... 6 common frames omitted

Thanks

Reply via email to