Thanks @Chamikara Jayalath<mailto:chamik...@google.com> I created this jira: 
https://issues.apache.org/jira/browse/BEAM-11721

From: Chamikara Jayalath <chamik...@google.com>
Date: Friday, January 29, 2021 at 2:47 PM
To: Tao Li <t...@zillow.com>
Cc: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Sounds like a bug. I think JIRA with a test case will still be helpful.

On Fri, Jan 29, 2021 at 2:33 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
@Chamikara Jayalath<mailto:chamik...@google.com> Sorry about the confusion. But 
I did more testing and using the spark runner actually yields the same error:

java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record 
cannot be cast to java.lang.Number
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
                at 
org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
                at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
                at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
                at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
                at org.apache.spark.scheduler.Task.run(Task.scala:123)
                at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
                at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

From: Chamikara Jayalath <chamik...@google.com<mailto:chamik...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 10:53 AM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Thanks. It might be something good to document in case other users run into 
this as well. Can you file a JIRA with the details ?


On Fri, Jan 29, 2021 at 10:47 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
OK I think this issue is due to incompatibility between the parquet files 
(created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25. 
It seems working after I switch to spark runner (from direct runner) and run 
the beam app in a spark cluster. I assume by doing this I am basically using 
parquet jars from spark distributable directly and now everything is compatible.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 7:45 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Hi community,

Can someone take a look at this issue? It is kind of a blocker to me right now. 
Really appreciate your help!

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 6:13 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably 
rule out any avro issue.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 9:07 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Potential bug with ParquetIO.read when reading arrays

Hi Beam community,

I am seeing an error when reading an array field using ParquetIO. I was using 
beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am 
I missing anything here? Please help me root cause this issue. Thanks so much!

Attached are the avro schema and the parquet file. Below is the schema tree as 
a quick visualization. The array field name is “list” and the element type is 
int. You can see this schema defined in the avsc file as well.

root
|-- list: array (nullable = true)
|    |-- element: integer (containsNull = true)

The beam code is very simple: 
pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));

Here is the error when running that code:

[direct-runner-worker] INFO 
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in 
memory in 130 ms. row count = 1
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot 
be cast to java.lang.Number
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
Caused by: java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
                at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
                at 
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
                at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
                at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
                at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
                at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
                at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)


Reply via email to