Hi Naehee, sorry for the late reply. I think you are right, there are bugs here. We didn't think about nested structures very well before.
Now we mainly focus on the new BulkFormat implementation, which we need to consider when implementing the new ParquetBulkFormat. Best, Jingsong On Tue, Nov 3, 2020 at 1:43 AM Naehee Kim <tych...@gmail.com> wrote: > Hi Jingsong, > > I am forwarding the email below to you, thinking you will have a good idea > about my questions below. I'd appreciate it if you give your thoughts. > > Thanks, > Naehee > > > ---------- Forwarded message --------- > From: Naehee Kim <tych...@gmail.com> > Date: Thu, Oct 29, 2020 at 4:38 PM > Subject: Question about processing a 3-level List data type in parquet > To: <d...@flink.apache.org> > > > Hi Flink Dev Community, > > I've found RowConverter.java in flink-parquet module doesn't support > reading a 3-level list type in parquet though it is able to process a > 2-level list type. > > 3-level > > optional group my_list (LIST) { > repeated group element { > required binary str (UTF8); > }; > } > > > 2-level > > optional group my_list (LIST) { > repeated int32 element; > } > > Reference: > https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists > > The parquet file I am testing with was written by Spark job and it has a > 3-level list type. When I try to process the parquet file, it runs into > 'java.lang.ClassCastException: Expected instance of group converter but got > "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter"' > error. > > I've tested with Flink 1.9 and checked RowConverter.java still remains the > same in v1.11. To process a 3-level list, I think RowConverter.java should > be updated with a new TypeInfo, instead of BasicArrayTypeInfo. (A 2-level > list is able to be processed with BasicArrayTypeInfo.). I wonder if my > understanding is correct and if you have any plan to support a 3-level List > datatype in parquet. > > For your reference, here are code snippet along with stack trace. > > MessageType readSchema = (new > AvroSchemaConverter()).convert(REPORTING_SCHEMA); > RowTypeInfo rowTypeInfo = (RowTypeInfo) > ParquetSchemaConverter.fromParquetType(readSchema); > ParquetRowInputFormat parquetInputFormat = new ParquetRowInputFormat(new > Path("file:///test-file.snappy.parquet"), readSchema); > DataStreamSource<Row> dataSource = env.createInput(parquetInputFormat, > rowTypeInfo); > > -- stack trace > > Job execution failed. > org.apache.flink.runtime.client.JobExecutionException: > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) > at > org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626) > at > org.apache.flink.streaming.util.TestStreamEnvironment.execute(TestStreamEnvironment.java:78) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489) > at > com.pinterest.xenon.flink.backfill.TestParquetSource.testParquetInsertionOriginalInputFormat(TestParquetSource.java:322) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > com.google.testing.junit.runner.internal.junit4.CancellableRequestFactory$CancellableRunner.run(CancellableRequestFactory.java:89) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at org.junit.runner.JUnitCore.run(JUnitCore.java:115) > at > com.google.testing.junit.runner.junit4.JUnit4Runner.run(JUnit4Runner.java:112) > at > com.google.testing.junit.runner.BazelTestRunner.runTestsInSuite(BazelTestRunner.java:153) > at > com.google.testing.junit.runner.BazelTestRunner.main(BazelTestRunner.java:84) > Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: > Caught exception when processing split: null > at > org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:357) > Caused by: java.lang.ClassCastException: Expected instance of group converter > but got > "org.apache.flink.formats.parquet.utils.RowConverter$RowPrimitiveConverter" > at > org.apache.parquet.io.api.Converter.asGroupConverter(Converter.java:34) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:267) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at > org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.createRecordReader(ParquetRecordReader.java:118) > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:227) > at > org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207) > at > org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233) > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$SplitReader.run(ContinuousFileReaderOperator.java:333) > > Thanks, > > Naehee > > -- Best, Jingsong Lee