It’s very likely that Spark SQL may have much better performance because of SQL push-downs and avoiding additional ser/deser operations.
In the same time, did you try to leverage "withProjection()” in ParquetIO and project only the fields that you needed? Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? Also, using SDF translation for Read on Spark Runner can cause performance degradation as well (we noticed that in our experiments). Try to use non-SDF read (if not yet) [2] PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not sure if a recording is already available but you can find the slides here [3] that can be helpful. — Alexey [1] https://issues.apache.org/jira/browse/BEAM-12070 [2] https://issues.apache.org/jira/browse/BEAM-10670 [3] https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing > On 5 Aug 2021, at 03:07, Tao Li <t...@zillow.com> wrote: > > @Alexey Romanenko <mailto:aromanenko....@gmail.com> @Ismaël Mejía > <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you > please take a look at this thread and confirm this jira covers the causes > https://issues.apache.org/jira/browse/BEAM-12646 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0> > ? > > This perf issue is currently a blocker to me.. > > Thanks so much! > > From: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> > Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" > <user@beam.apache.org <mailto:user@beam.apache.org>> > Date: Friday, July 30, 2021 at 3:53 PM > To: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>>, > "user@beam.apache.org <mailto:user@beam.apache.org>" <user@beam.apache.org > <mailto:user@beam.apache.org>> > Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao > <yuc...@trulia.com <mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > Thanks everyone for your help. > > We actually did another round of perf comparison between Beam (on spark) and > native spark, without any projection/filtering in the query (to rule out the > “predicate pushdown” factor). > > The time spent on Beam with spark runner is still taking 3-5x period of time > compared with native spark, and the cause > ishttps://issues.apache.org/jira/browse/BEAM-12646 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0> > according to the spark metrics. Spark runner is pretty much the bottleneck. > > <image001.png> > > From: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>> > Date: Thursday, July 29, 2021 at 2:11 PM > To: "user@beam.apache.org <mailto:user@beam.apache.org>" > <user@beam.apache.org <mailto:user@beam.apache.org>> > Cc: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>, Kyle Weaver > <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao > <yuc...@trulia.com <mailto:yuc...@trulia.com>> > Subject: Re: Perf issue with Beam on spark (spark runner) > > Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0. > > Andrew > > On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <apill...@google.com > <mailto:apill...@google.com>> wrote: >> Beam SQL doesn't currently have project pushdown for ParquetIO (we are >> working to expand this to more IOs). Using ParquetIO withProjection directly >> will produce better results. >> >> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <rober...@google.com >> <mailto:rober...@google.com>> wrote: >>> Could you try using Beam SQL [1] and see if that gives more similar result >>> to your Spark SQL query? I would also be curious if the performance is >>> sufficient using withProjection to only read the auction, price, and bidder >>> columns. >>> >>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ >>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081698082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=fHu5GIVNbGwihul%2BLafcFeszn8J718lhY4wmlqHwjCA%3D&reserved=0> >>> [2] >>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema- >>> >>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081698082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=yPlAa0Xj316fXexW%2FwKmviOtuKabY%2BKLjJk3u2k7%2Ftk%3D&reserved=0> >>> >>> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <t...@zillow.com >>> <mailto:t...@zillow.com>> wrote: >>>> Thanks Robert for filing BEAM-12646 >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0>. >>>> This perf issue is a blocker for us to adopt Beam. It would be great if >>>> the community could conclude the root cause and share an ETA for the fix. >>>> Thanks so much! >>>> >>>> >>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> >>>> Date: Wednesday, July 21, 2021 at 3:51 PM >>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Kyle Weaver >>>> <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao >>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>> >>>> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <t...@zillow.com >>>> <mailto:t...@zillow.com>> wrote: >>>>> @Robert Bradshaw <mailto:rober...@google.com> with Spark API, the code is >>>>> actually much simple. We are just calling spark SQL API against a hive >>>>> table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder FROM bid”) >>>> >>>> Good chance that this is pushing projection of those few fields up into >>>> the read operator, which could be a dramatic savings. You could try doing >>>> it manually in Beam, or use Beam's SQL that should do the same. >>>> >>>>> >>>>> I think the “globally windowed GBK” optimization you are proposing is a >>>>> good callout. >>>> >>>> Filed https://issues.apache.org/jira/browse/BEAM-12646 >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc40cbb6894a540dcd37008d952d578b9%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637631899081708037%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UCxzySGVB8H%2B2tOmjDVN5FqeSVxarmD5c1gg3Xa4RKA%3D&reserved=0> >>>> to track. >>>> >>>>> >>>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> >>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>> Date: Wednesday, July 21, 2021 at 1:09 PM >>>>> To: user <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>> Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu >>>>> Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>> >>>>> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <t...@zillow.com >>>>> <mailto:t...@zillow.com>> wrote: >>>>>> Kyle, I don’t expect such a huge perf diff as well. To your question, no >>>>>> I am not specifying withProjection or withSplit for parquet reader. >>>>> >>>>> Are you doing so in your Spark code? >>>>> >>>>>> Below is my parquet read code: >>>>>> >>>>>> PCollection<FileIO.ReadableFile> files = pipeline >>>>>> .apply(FileIO.match().filepattern(beamRequiredPath)) >>>>>> .apply(FileIO.readMatches()); >>>>>> >>>>>> PCollection<Row> table = files >>>>>> .apply(ParquetIO >>>>>> .readFiles(avroSchema) >>>>>> >>>>>> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, >>>>>> "false"))) >>>>>> .apply(MapElements >>>>>> .into(TypeDescriptors.rows()) >>>>>> >>>>>> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema)))) >>>>>> >>>>>> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema))); >>>>>> >>>>>> >>>>>> According to my investigation, looks like below call stack is very >>>>>> computation intensive and causing a lot of GC time. And looks like the >>>>>> stack comes from spark runner code. >>>>> >>>>> This does look inordinately expensive. I wonder if it would make sense to >>>>> optimize the globally windowed GBK as some other runners do. >>>>> >>>>>> >>>>>> <image001.png> >>>>>> >>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> >>>>>> Date: Tuesday, July 20, 2021 at 3:57 PM >>>>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >>>>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Yuchu Cao >>>>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>> >>>>>> Beam has its own implementation of Parquet IO, and doesn't use Spark's. >>>>>> It's possible Spark's implementation does more optimizations, though >>>>>> perhaps not enough to result in such a dramatic difference. >>>>>> >>>>>> I'm curious how your Parquet read is configured. In particular, if >>>>>> withProjection or withSplit are set. >>>>>> >>>>>> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <t...@zillow.com >>>>>> <mailto:t...@zillow.com>> wrote: >>>>>>> Hi Kyle, >>>>>>> >>>>>>> The ParDo (which references the code I shared) is the only >>>>>>> transformation in my pipeline. The input and output are parquet files >>>>>>> in S3 (we are using beam ParquetIO). >>>>>>> >>>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> >>>>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>>> Date: Tuesday, July 20, 2021 at 2:13 PM >>>>>>> To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>>> Cc: Yuchu Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>>> >>>>>>> The DoFn you shared is simple enough that it seems unlikely to be the >>>>>>> performance bottleneck here. >>>>>>> >>>>>>> Can you share more information about your complete pipeline? What other >>>>>>> transforms are there? What sources/sinks are you using? >>>>>>> >>>>>>> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <t...@zillow.com >>>>>>> <mailto:t...@zillow.com>> wrote: >>>>>>>> Hi Beam community, >>>>>>>> >>>>>>>> We are seeing a serious perf issue with beam using spark runner, >>>>>>>> compared with writing a native spark app. Can you please provide some >>>>>>>> help? >>>>>>>> >>>>>>>> The beam on spark app is taking 8-10 min, whereas a native spark is >>>>>>>> only taking 2 min. Below is Spark UI, from which you can see the >>>>>>>> flatMapToPair method is very time consuming. Is this method call >>>>>>>> coming from spark runner? >>>>>>>> >>>>>>>> <image001.png> >>>>>>>> >>>>>>>> I suspect this is caused by high GC time. See “GC Time” column below: >>>>>>>> >>>>>>>> <image002.png> >>>>>>>> >>>>>>>> >>>>>>>> The beam code is really simple, just a per row processing. >>>>>>>> >>>>>>>> public class CalcFn extends DoFn<Row, Row> { >>>>>>>> protected Logger log = LoggerFactory.getLogger(this.getClass()); >>>>>>>> private Schema schema; >>>>>>>> >>>>>>>> public CalcFn(Schema schema) { >>>>>>>> this.schema = schema; >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> @ProcessElement >>>>>>>> public void processElement(@Element Row row,OutputReceiver<Row> >>>>>>>> receiver) { >>>>>>>> // Row row = ctx.element(); >>>>>>>> Long auction_value = (Long) row.getBaseValue("auction"); >>>>>>>> Long bid_value = (Long) row.getBaseValue("bidder"); >>>>>>>> Long price = (Long) row.getBaseValue("price"); >>>>>>>> Double euro = price * 0.82; >>>>>>>> >>>>>>>> receiver.output( Row.withSchema(schema) >>>>>>>> .addValues(auction_value, euro, bid_value).build()); >>>>>>>> } >>>>>>>> } >>>>>>>>