Hi all,

Thanks for all the discussions so far (including discussions in BEAM-11721 and 
offline discussions). We will use BEAM-11650 to track the request of making 
avro schema optional for ParquetIO.read operation. I can potentially work on 
that ticket later.

There is another issue that I hope to get some help with from the beam 
community. I have posted this question on beam slack channels but I am 
duplicating it here for visibility. Basically I am using ParquetIO (which uses 
AvroParquetReader) to read spark created parquet files (please see attached). 
The inspection result is below. You can see the spark schema is very simple, 
which is just a field of an array of integers:

creator: parquet-mr version 1.10.1 (build 
815bcfa4a4aacf66d207b3dc692150d16b5740b9)
extra: org.apache.spark.sql.parquet.row.metadata = 
{"type":"struct","fields":[{"name":"numbers","type":
{"type":"array","elementType":"integer","containsNull":true}
,"nullable":true,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.list: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3


When I use ParquetIO to read this file, the Avro schema for the 
PCollection<GenericRecord> becomes:
{
  "type": "record",
 "name": "spark_schema",
  "fields": [
    {
      "name": "numbers",
      "type": [
        "null",
        {
          "type": "array",
          "items": {
            "type": "record",
            "name": "list",
            "fields": [
              {
                "name": "element",
                "type": [
                  "null",
                  "int"
                ],
                "default": null
              }
            ]
          }
        }
      ],
      "default": null
    }
  ]
}


You can see that the schema becomes an array of record type (which contains a 
"element" field). The reason is probably that internally spark parquet is 
defining a “list” record type.

The problem is that this avro schema is not the one I want deal with in the 
following beam transforms. Instead I want to retain the original schema defined 
in spark which is simply an array of integers. Is there an easy way to retain 
the original schema when using ParquetIO to read spark created fields? Did 
anyone run into this need? Please advise. Thanks!



From: Tao Li <t...@zillow.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Saturday, January 30, 2021 at 11:21 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Please let me rephrase my question. It's understandable that it might be a good 
practice to specify avro schema when reading parquet files (e.g. to support 
schema evolution etc). But sometimes the overhead is more than the benefits. 
Given that AvroParquetReader does not require an avro schema, is it possible to 
make the avro schema specification optional for ParquetIO.read? Thanks!

From: Tao Li <t...@zillow.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Saturday, January 30, 2021 at 1:54 PM
To: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Hi all,

I have made a good progress to figure out the root cause to this issue. The 
details are in BEAM-11721. I asked some questions at the end of the jira and I 
am just duplicating it here for visibility. Thanks so much for the help and 
support from the community!


  1.  It's not quite intuitive to create a avro schema for ParquetIO, which 
contains spark defined fields ("list", "element" etc), when we are ingesting 
spark created parquet files. Is it possible to support the standard avro 
definition for the array type like 
(“type":"array","elementType":"integer","containsNull":true”)? Can beam do the 
schema translation under the hood to avoid the hassle for the users?
  2.  Taking a step back, why does ParquetIO require an avro schema 
specification, while AvroParquetReader does not actually require the schema? I 
briefly looked at the ParquetIO source code but has not figured it out yet.



From: Tao Li <t...@zillow.com>
Reply-To: "user@beam.apache.org" <user@beam.apache.org>
Date: Friday, January 29, 2021 at 3:37 PM
To: Chamikara Jayalath <chamik...@google.com>, "user@beam.apache.org" 
<user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Thanks @Chamikara Jayalath<mailto:chamik...@google.com> I created this jira: 
https://issues.apache.org/jira/browse/BEAM-11721

From: Chamikara Jayalath <chamik...@google.com>
Date: Friday, January 29, 2021 at 2:47 PM
To: Tao Li <t...@zillow.com>
Cc: "user@beam.apache.org" <user@beam.apache.org>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Sounds like a bug. I think JIRA with a test case will still be helpful.

On Fri, Jan 29, 2021 at 2:33 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
@Chamikara Jayalath<mailto:chamik...@google.com> Sorry about the confusion. But 
I did more testing and using the spark runner actually yields the same error:

java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record 
cannot be cast to java.lang.Number
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73)
                at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582)
                at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542)
                at 
org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86)
                at 
org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785)
                at 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47)
                at 
scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
                at scala.collection.Iterator$$anon$12.next(Iterator.scala:445)
                at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
                at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
                at org.apache.spark.scheduler.Task.run(Task.scala:123)
                at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
                at 
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

From: Chamikara Jayalath <chamik...@google.com<mailto:chamik...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 10:53 AM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Thanks. It might be something good to document in case other users run into 
this as well. Can you file a JIRA with the details ?


On Fri, Jan 29, 2021 at 10:47 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
OK I think this issue is due to incompatibility between the parquet files 
(created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25. 
It seems working after I switch to spark runner (from direct runner) and run 
the beam app in a spark cluster. I assume by doing this I am basically using 
parquet jars from spark distributable directly and now everything is compatible.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, January 29, 2021 at 7:45 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

Hi community,

Can someone take a look at this issue? It is kind of a blocker to me right now. 
Really appreciate your help!

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 6:13 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Re: Potential bug with ParquetIO.read when reading arrays

BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably 
rule out any avro issue.

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Thursday, January 28, 2021 at 9:07 AM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Subject: Potential bug with ParquetIO.read when reading arrays

Hi Beam community,

I am seeing an error when reading an array field using ParquetIO. I was using 
beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am 
I missing anything here? Please help me root cause this issue. Thanks so much!

Attached are the avro schema and the parquet file. Below is the schema tree as 
a quick visualization. The array field name is “list” and the element type is 
int. You can see this schema defined in the avsc file as well.

root
|-- list: array (nullable = true)
|    |-- element: integer (containsNull = true)

The beam code is very simple: 
pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));

Here is the error when running that code:

[direct-runner-worker] INFO 
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in 
memory in 130 ms. row count = 1
Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot 
be cast to java.lang.Number
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
                at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
                at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
                at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
Caused by: java.lang.ClassCastException: 
org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
                at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
                at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
                at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
                at 
org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
                at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
                at 
org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
                at 
org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
                at 
org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
                at 
org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
                at 
org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
                at 
org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
                at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
                at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)


Attachment: from-spark.snappy.parquet
Description: from-spark.snappy.parquet

Reply via email to