OK I think this issue is due to incompatibility between the parquet files (created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25. It seems working after I switch to spark runner (from direct runner) and run the beam app in a spark cluster. I assume by doing this I am basically using parquet jars from spark distributable directly and now everything is compatible.
From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Friday, January 29, 2021 at 7:45 AM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays Hi community, Can someone take a look at this issue? It is kind of a blocker to me right now. Really appreciate your help! From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Thursday, January 28, 2021 at 6:13 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably rule out any avro issue. From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Thursday, January 28, 2021 at 9:07 AM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Potential bug with ParquetIO.read when reading arrays Hi Beam community, I am seeing an error when reading an array field using ParquetIO. I was using beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am I missing anything here? Please help me root cause this issue. Thanks so much! Attached are the avro schema and the parquet file. Below is the schema tree as a quick visualization. The array field name is “list” and the element type is int. You can see this schema defined in the avsc file as well. root |-- list: array (nullable = true) | |-- element: integer (containsNull = true) The beam code is very simple: pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath)); Here is the error when running that code: [direct-runner-worker] INFO shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 130 ms. row count = 1 Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)