Hi Naehee,

Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat.
Would you please create a jira ticket and assign to me. I will try to fix
it by the end of this weekend.
My Jira account name Zhenqiu Huang. Thanks


Best Regards
Peter Huang


On Wed, Nov 4, 2020 at 11:57 PM Naehee Kim <tych...@gmail.com> wrote:

> Hi Jingsong,
>
> Thanks for the feedback. Can you let me know the concept and timeline of
> BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat?
>
> Our use case is for backfill to process parquet files in case of any data
> issue is found in the normal processing of kafka input. Thus, we want to
> make a job to easily switch kafka input and parquet file input and vice
> versa. Wonder if ParquetBulkFormat can fit in our use case.
>
> Best,
> Naehee
>
> On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
>
>> Hi Naehee, sorry for the late reply.
>>
>> I think you are right, there are bugs here. We didn't think about nested
>> structures very well before.
>>
>> Now we mainly focus on the new BulkFormat implementation, which we need
>> to consider when implementing the new ParquetBulkFormat.
>>
>> Best,
>> Jingsong
>>
>> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <tych...@gmail.com> wrote:
>>
>>> Hi Jingsong,
>>>
>>> I am forwarding the email below to you, thinking you will have a good
>>> idea about my questions below. I'd appreciate it if you give your thoughts.
>>>
>>> Thanks,
>>> Naehee
>>>
>>>
>>> ---------- Forwarded message ---------
>>> From: Naehee Kim <tych...@gmail.com>
>>> Date: Thu, Oct 29, 2020 at 4:38 PM
>>> Subject: Question about processing a 3-level List data type in parquet
>>> To: <d...@flink.apache.org>
>>>
>>>
>>> Hi Flink Dev Community,
>>>
>>> I've found RowConverter.java in flink-parquet module doesn't support
>>> reading a 3-level list type in parquet though it is able to process a
>>> 2-level list type.
>>>
>>> 3-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated group element {
>>>     required binary str (UTF8);
>>>   };
>>> }
>>>
>>>
>>>   2-level
>>>
>>> optional group my_list (LIST) {
>>>   repeated int32 element;
>>> }
>>>
>>> Reference:
>>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
>>>
>>> The parquet file I am testing with was written by Spark job and it has a
>>> 3-level list type. When I try to process the parquet file, it runs into
>>> 'java.lang.ClassCastException: Expected instance of group converter but got
>>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"'
>>> error.
>>>
>>> I've tested with Flink 1.9 and checked RowConverter.java still remains
>>> the same in v1.11. To process a 3-level list, I think RowConverter.java
>>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A
>>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if
>>> my understanding is correct and if you have any plan to support a 3-level
>>> List datatype in parquet.
>>>
>>> For your reference, here are code snippet along with stack trace.
>>>
>>> MessageType readSchema = (new 
>>> AvroSchemaConverter()).convert(REPORTING_SCHEMA);
>>> RowTypeInfo rowTypeInfo = (RowTypeInfo) 
>>> ParquetSchemaConverter.fromParquetType(readSchema);
>>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new 
>>> Path("file:///test-file.snappy.parquet"), readSchema);
>>> DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, 
>>> rowTypeInfo);
>>>
>>> -- stack trace
>>>
>>> Job execution failed.
>>> org.apache.flink.runtime.client.JobExecutionException:
>>>     at 
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>     at 
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626)
>>>     at 
>>> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78)
>>>     at 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>     at 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
>>>     at 
>>> com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>     at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>>>     at 
>>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>>>     at 
>>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>>>     at 
>>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>>>     at 
>>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>>>     at 
>>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>>>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>>>     at 
>>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>>>     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>>>     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>>>     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>>>     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>>>     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>>>     at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>>>     at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>>>     at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>>>     at 
>>> com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89)
>>>     at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>>>     at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
>>>     at 
>>> com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112)
>>>     at 
>>> com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153)
>>>     at 
>>> com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: 
>>> Caught exception when processing split: null
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>     at 
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>     at 
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357)
>>> Caused by: java.lang.ClassCastException: Expected instance of group 
>>> converter but got 
>>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"
>>>     at 
>>> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34)
>>>     at 
>>> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267)
>>>     at 
>>> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>>>     at 
>>> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>>>     at 
>>> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>>>     at 
>>> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>>>     at 
>>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118)
>>>     at 
>>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227)
>>>     at 
>>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
>>>     at 
>>> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
>>>     at 
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333)
>>>
>>> Thanks,
>>>
>>> Naehee
>>>
>>>
>>
>> --
>> Best, Jingsong Lee
>>
>

Reply via email to