Sounds like a bug. I think JIRA with a test case will still be helpful. On Fri, Jan 29, 2021 at 2:33 PM Tao Li <t...@zillow.com> wrote:
> @Chamikara Jayalath <chamik...@google.com> Sorry about the confusion. But > I did more testing and using the spark runner actually yields the same > error: > > > > java.lang.ClassCastException: > shaded.org.apache.avro.generic.GenericData$Record cannot be cast to > java.lang.Number > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) > > at > shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) > > at > org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317) > > at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) > > at > org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) > > at > org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582) > > at > org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542) > > at > org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55) > > at > org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86) > > at > org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129) > > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785) > > at > org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) > > at > scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) > > at > scala.collection.Iterator$$anon$12.next(Iterator.scala:445) > > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) > > at org.apache.spark.scheduler.Task.run(Task.scala:123) > > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > > > > *From: *Chamikara Jayalath <chamik...@google.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Friday, January 29, 2021 at 10:53 AM > *To: *user <user@beam.apache.org> > *Subject: *Re: Potential bug with ParquetIO.read when reading arrays > > > > Thanks. It might be something good to document in case other users run > into this as well. Can you file a JIRA with the details ? > > > > > > On Fri, Jan 29, 2021 at 10:47 AM Tao Li <t...@zillow.com> wrote: > > OK I think this issue is due to incompatibility between the parquet files > (created with spark 2.4) and parquet version as a dependency of ParquetIO > 2.25. It seems working after I switch to spark runner (from direct runner) > and run the beam app in a spark cluster. I assume by doing this I am > basically using parquet jars from spark distributable directly and now > everything is compatible. > > > > *From: *Tao Li <t...@zillow.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Friday, January 29, 2021 at 7:45 AM > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Re: Potential bug with ParquetIO.read when reading arrays > > > > Hi community, > > > > Can someone take a look at this issue? It is kind of a blocker to me right > now. Really appreciate your help! > > > > *From: *Tao Li <t...@zillow.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Thursday, January 28, 2021 at 6:13 PM > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Re: Potential bug with ParquetIO.read when reading arrays > > > > BTW I tried avro 1.8 and 1.9 and both have the same error. So we can > probably rule out any avro issue. > > > > *From: *Tao Li <t...@zillow.com> > *Reply-To: *"user@beam.apache.org" <user@beam.apache.org> > *Date: *Thursday, January 28, 2021 at 9:07 AM > *To: *"user@beam.apache.org" <user@beam.apache.org> > *Subject: *Potential bug with ParquetIO.read when reading arrays > > > > Hi Beam community, > > > > I am seeing an error when reading an array field using ParquetIO. I was > using beam 2.25 and the direct runner for testing. Is this a bug or a known > issue? Am I missing anything here? Please help me root cause this issue. > Thanks so much! > > > > Attached are the avro schema and the parquet file. Below is the schema > tree as a quick visualization. The array field name is “list” and the > element type is int. You can see this schema defined in the avsc file as > well. > > > > root > > |-- list: array (nullable = true) > > | |-- element: integer (containsNull = true) > > > > The beam code is very simple: > pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath)); > > > > Here is the error when running that code: > > > > [direct-runner-worker] INFO > shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read > in memory in 130 ms. row count = 1 > > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record > cannot be cast to java.lang.Number > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353) > > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321) > > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216) > > at > org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) > > Caused by: java.lang.ClassCastException: > org.apache.avro.generic.GenericData$Record cannot be cast to > java.lang.Number > > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > > at > org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) > > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > > at > org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206) > > at > org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) > > at > org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) > > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) > > at > org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317) > > at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) > > at > org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82) > > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66) > > at > org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51) > > at > org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) > > at > org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) > > at > org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) > > at > org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) > > at > org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301) > > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) > > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) > > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) > > at > org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) > > at > org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646) > > > > > >