Hi all, Thanks for all the discussions so far (including discussions in BEAM-11721 and offline discussions). We will use BEAM-11650 to track the request of making avro schema optional for ParquetIO.read operation. I can potentially work on that ticket later.
There is another issue that I hope to get some help with from the beam community. I have posted this question on beam slack channels but I am duplicating it here for visibility. Basically I am using ParquetIO (which uses AvroParquetReader) to read spark created parquet files (please see attached). The inspection result is below. You can see the spark schema is very simple, which is just a field of an array of integers: creator: parquet-mr version 1.10.1 (build 815bcfa4a4aacf66d207b3dc692150d16b5740b9) extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"numbers","type": {"type":"array","elementType":"integer","containsNull":true} ,"nullable":true,"metadata":{}}]} file schema: spark_schema -------------------------------------------------------------------------------- numbers: OPTIONAL F:1 .list: REPEATED F:1 ..element: OPTIONAL INT32 R:1 D:3 When I use ParquetIO to read this file, the Avro schema for the PCollection<GenericRecord> becomes: { "type": "record", "name": "spark_schema", "fields": [ { "name": "numbers", "type": [ "null", { "type": "array", "items": { "type": "record", "name": "list", "fields": [ { "name": "element", "type": [ "null", "int" ], "default": null } ] } } ], "default": null } ] } You can see that the schema becomes an array of record type (which contains a "element" field). The reason is probably that internally spark parquet is defining a “list” record type. The problem is that this avro schema is not the one I want deal with in the following beam transforms. Instead I want to retain the original schema defined in spark which is simply an array of integers. Is there an easy way to retain the original schema when using ParquetIO to read spark created fields? Did anyone run into this need? Please advise. Thanks! From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Saturday, January 30, 2021 at 11:21 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays Please let me rephrase my question. It's understandable that it might be a good practice to specify avro schema when reading parquet files (e.g. to support schema evolution etc). But sometimes the overhead is more than the benefits. Given that AvroParquetReader does not require an avro schema, is it possible to make the avro schema specification optional for ParquetIO.read? Thanks! From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Saturday, January 30, 2021 at 1:54 PM To: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays Hi all, I have made a good progress to figure out the root cause to this issue. The details are in BEAM-11721. I asked some questions at the end of the jira and I am just duplicating it here for visibility. Thanks so much for the help and support from the community! 1. It's not quite intuitive to create a avro schema for ParquetIO, which contains spark defined fields ("list", "element" etc), when we are ingesting spark created parquet files. Is it possible to support the standard avro definition for the array type like (“type":"array","elementType":"integer","containsNull":true”)? Can beam do the schema translation under the hood to avoid the hassle for the users? 2. Taking a step back, why does ParquetIO require an avro schema specification, while AvroParquetReader does not actually require the schema? I briefly looked at the ParquetIO source code but has not figured it out yet. From: Tao Li <t...@zillow.com> Reply-To: "user@beam.apache.org" <user@beam.apache.org> Date: Friday, January 29, 2021 at 3:37 PM To: Chamikara Jayalath <chamik...@google.com>, "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays Thanks @Chamikara Jayalath<mailto:chamik...@google.com> I created this jira: https://issues.apache.org/jira/browse/BEAM-11721 From: Chamikara Jayalath <chamik...@google.com> Date: Friday, January 29, 2021 at 2:47 PM To: Tao Li <t...@zillow.com> Cc: "user@beam.apache.org" <user@beam.apache.org> Subject: Re: Potential bug with ParquetIO.read when reading arrays Sounds like a bug. I think JIRA with a test case will still be helpful. On Fri, Jan 29, 2021 at 2:33 PM Tao Li <t...@zillow.com<mailto:t...@zillow.com>> wrote: @Chamikara Jayalath<mailto:chamik...@google.com> Sorry about the confusion. But I did more testing and using the spark runner actually yields the same error: java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number at shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192) at shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120) at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at shaded.org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166) at shaded.org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156) at shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118) at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75) at shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:73) at org.apache.beam.sdk.coders.KvCoder.encode(KvCoder.java:37) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:591) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:582) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:542) at org.apache.beam.runners.spark.coders.CoderHelpers.toByteArray(CoderHelpers.java:55) at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$groupByKeyAndWindow$c9b6f5c4$1(GroupNonMergingWindowsFunctions.java:86) at org.apache.beam.runners.spark.translation.GroupNonMergingWindowsFunctions.lambda$bringWindowToKey$0(GroupNonMergingWindowsFunctions.java:129) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators$6.transform(Iterators.java:785) at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TransformedIterator.next(TransformedIterator.java:47) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.Iterator$$anon$12.next(Iterator.scala:445) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) From: Chamikara Jayalath <chamik...@google.com<mailto:chamik...@google.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Friday, January 29, 2021 at 10:53 AM To: user <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Potential bug with ParquetIO.read when reading arrays Thanks. It might be something good to document in case other users run into this as well. Can you file a JIRA with the details ? On Fri, Jan 29, 2021 at 10:47 AM Tao Li <t...@zillow.com<mailto:t...@zillow.com>> wrote: OK I think this issue is due to incompatibility between the parquet files (created with spark 2.4) and parquet version as a dependency of ParquetIO 2.25. It seems working after I switch to spark runner (from direct runner) and run the beam app in a spark cluster. I assume by doing this I am basically using parquet jars from spark distributable directly and now everything is compatible. From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Friday, January 29, 2021 at 7:45 AM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Potential bug with ParquetIO.read when reading arrays Hi community, Can someone take a look at this issue? It is kind of a blocker to me right now. Really appreciate your help! From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Thursday, January 28, 2021 at 6:13 PM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Re: Potential bug with ParquetIO.read when reading arrays BTW I tried avro 1.8 and 1.9 and both have the same error. So we can probably rule out any avro issue. From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>> Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Date: Thursday, January 28, 2021 at 9:07 AM To: "user@beam.apache.org<mailto:user@beam.apache.org>" <user@beam.apache.org<mailto:user@beam.apache.org>> Subject: Potential bug with ParquetIO.read when reading arrays Hi Beam community, I am seeing an error when reading an array field using ParquetIO. I was using beam 2.25 and the direct runner for testing. Is this a bug or a known issue? Am I missing anything here? Please help me root cause this issue. Thanks so much! Attached are the avro schema and the parquet file. Below is the schema tree as a quick visualization. The array field name is “list” and the element type is int. You can see this schema defined in the avsc file as well. root |-- list: array (nullable = true) | |-- element: integer (containsNull = true) The beam code is very simple: pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath)); Here is the error when running that code: [direct-runner-worker] INFO shaded.org.apache.parquet.hadoop.InternalParquetRecordReader - block read in memory in 130 ms. row count = 1 Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353) at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216) at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206) at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195) at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72) at org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317) at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136) at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66) at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51) at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141) at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115) at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46) at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112) at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413) at org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401) at org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
from-spark.snappy.parquet
Description: from-spark.snappy.parquet