Hi all,

I have made a good progress to figure out the root cause to this issue. The 
details are in BEAM-11721. I asked some questions at the end of the jira and I 
am just duplicating it here for visibility. Thanks so much for the help and 
support from the community!


  1.  It's not quite intuitive to create a avro schema for ParquetIO, which 
contains spark defined fields ("list", "element" etc), when we are ingesting 
spark created parquet files. Is it possible to support the standard avro 
definition for the array type like 
(“type":"array","elementType":"integer","containsNull":true”)? Can beam do the 
schema translation under the hood to avoid the hassle for the users?
  2.  Taking a step back, why does ParquetIO require an avro schema 
specification, while AvroParquetReader does not actually require the schema? I 
briefly looked at the ParquetIO source code but has not figured it out yet.



From: Tao Li <t...@zillow.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Friday, January 29, 2021 at 3:37 PM
To: Chamikara Jayalath <chamik...@google.com>, "user@beam.apache.org" 
<user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Thanks @Chamikara Jayalath<mailto:chamik...@google.com> I created this jira: 
https://issues.apache.org/jira/browse/BEAM-11721

From: Chamikara Jayalath <chamik...@google.com>
Date: Friday, January 29, 2021 at 2:47 PM
To: Tao Li <t...@zillow.com>
Cc: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Sounds like a bug. I think JIRA with a test case will still be helpful.

On Fri, Jan 29, 2021 at 2:33 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
@Chamikara Jayalath<mailto:chamik...@google.com> Sorry about the confusion. But 
I did more testing and using the spark runner actually yields the same error:

java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record 
cannot be cast to java.lang.Number
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
                at 
org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
                at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
                at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
                at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
                at org.apache.spark.scheduler.Task.run(Task.scala:123)
                at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
                at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

From: Chamikara Jayalath <chamik...@google.com<mailto:chamik...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 10:53 AM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Thanks. It might be something good to document in case other users run into 
this as well. Can you file a JIRA with the details ?


On Fri, Jan 29, 2021 at 10:47 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
OK I think this issue is due to incompatibility between the parquet files 
(created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25. 
It seems working after I switch to spark runner (from direct runner) and run 
the beam app in a spark cluster. I assume by doing this I am basically using 
parquet jars from spark distributable directly and now everything is compatible.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 7:45 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Hi community,

Can someone take a look at this issue? It is kind of a blocker to me right now. 
Really appreciate your help!

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 6:13 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably 
rule out any avro issue.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 9:07 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Potential bug with ParquetIO.read when reading arrays

Hi Beam community,

I am seeing an error when reading an array field using ParquetIO. I was using 
beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am 
I missing anything here? Please help me root cause this issue. Thanks so much!

Attached are the avro schema and the parquet file. Below is the schema tree as 
a quick visualization. The array field name is “list” and the element type is 
int. You can see this schema defined in the avsc file as well.

root
|-- list: array (nullable = true)
|    |-- element: integer (containsNull = true)

The beam code is very simple: 
pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));

Here is the error when running that code:

[direct-runner-worker] INFO 
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in 
memory in 130 ms. row count = 1
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot 
be cast to java.lang.Number
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
Caused by: java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
                at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
                at 
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
                at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
                at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
                at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
                at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
                at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)


Reply via email to