Hi Naehee, Thanks for reporting the issue. Yes, it is a bug in the ParquetInputFormat. Would you please create a jira ticket and assign to me. I will try to fix it by the end of this weekend. My Jira account name Zhenqiu Huang. Thanks
Best Regards Peter Huang On Wed, Nov 4, 2020 at 11:57 PM Naehee Kim <tych...@gmail.com> wrote: > Hi Jingsong, > > Thanks for the feedback. Can you let me know the concept and timeline of > BulkFormat/ParquetBulkFormat and the difference with ParquetInputFormat? > > Our use case is for backfill to process parquet files in case of any data > issue is found in the normal processing of kafka input. Thus, we want to > make a job to easily switch kafka input and parquet file input and vice > versa. Wonder if ParquetBulkFormat can fit in our use case. > > Best, > Naehee > > On Tue, Nov 3, 2020 at 10:09 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > >> Hi Naehee, sorry for the late reply. >> >> I think you are right, there are bugs here. We didn't think about nested >> structures very well before. >> >> Now we mainly focus on the new BulkFormat implementation, which we need >> to consider when implementing the new ParquetBulkFormat. >> >> Best, >> Jingsong >> >> On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <tych...@gmail.com> wrote: >> >>> Hi Jingsong, >>> >>> I am forwarding the email below to you, thinking you will have a good >>> idea about my questions below. I'd appreciate it if you give your thoughts. >>> >>> Thanks, >>> Naehee >>> >>> >>> ---------- Forwarded message --------- >>> From: Naehee Kim <tych...@gmail.com> >>> Date: Thu, Oct 29, 2020 at 4:38 PM >>> Subject: Question about processing a 3-level List data type in parquet >>> To: <d...@flink.apache.org> >>> >>> >>> Hi Flink Dev Community, >>> >>> I've found RowConverter.java in flink-parquet module doesn't support >>> reading a 3-level list type in parquet though it is able to process a >>> 2-level list type. >>> >>> 3-level >>> >>> optional group my_list (LIST) { >>> repeated group element { >>> required binary str (UTF8); >>> }; >>> } >>> >>> >>> 2-level >>> >>> optional group my_list (LIST) { >>> repeated int32 element; >>> } >>> >>> Reference: >>> https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists >>> >>> The parquet file I am testing with was written by Spark job and it has a >>> 3-level list type. When I try to process the parquet file, it runs into >>> 'java.lang.ClassCastException: Expected instance of group converter but got >>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' >>> error. >>> >>> I've tested with Flink 1.9 and checked RowConverter.java still remains >>> the same in v1.11. To process a 3-level list, I think RowConverter.java >>> should be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A >>> 2-level list is able to be processed with BasicArrayTypeInfo.). I wonder if >>> my understanding is correct and if you have any plan to support a 3-level >>> List datatype in parquet. >>> >>> For your reference, here are code snippet along with stack trace. >>> >>> MessageType readSchema = (new >>> AvroSchemaConverter()).convert(REPORTING_SCHEMA); >>> RowTypeInfo rowTypeInfo = (RowTypeInfo) >>> ParquetSchemaConverter.fromParquetType(readSchema); >>> ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new >>> Path("file:///test-file.snappy.parquet"), readSchema); >>> DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, >>> rowTypeInfo); >>> >>> -- stack trace >>> >>> Job execution failed. >>> org.apache.flink.runtime.client.JobExecutionException: >>> at >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) >>> at >>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) >>> at >>> org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489) >>> at >>> com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) >>> at >>> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >>> at >>> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) >>> at >>> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >>> at >>> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) >>> at >>> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) >>> at >>> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) >>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) >>> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) >>> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) >>> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) >>> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) >>> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) >>> at org.junit.rules.RunRules.evaluate(RunRules.java:20) >>> at org.junit.runners.ParentRunner.run(ParentRunner.java:363) >>> at >>> com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89) >>> at org.junit.runner.JUnitCore.run(JUnitCore.java:137) >>> at org.junit.runner.JUnitCore.run(JUnitCore.java:115) >>> at >>> com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112) >>> at >>> com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153) >>> at >>> com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84) >>> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: >>> Caught exception when processing split: null >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) >>> Caused by: java.lang.ClassCastException: Expected instance of group >>> converter but got >>> "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" >>> at >>> org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) >>> at >>> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) >>> at >>> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) >>> at >>> org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) >>> at >>> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) >>> at >>> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) >>> at >>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) >>> at >>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) >>> at >>> org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) >>> at >>> org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) >>> at >>> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333) >>> >>> Thanks, >>> >>> Naehee >>> >>> >> >> -- >> Best, Jingsong Lee >> >