> On 5 Aug 2021, at 18:17, Tao Li <t...@zillow.com> wrote: > > It was a great presentation!
Thanks! > Regarding my perf testing, I was not doing aggregation, filtering, > projection or joining. I was simply reading all the fields of parquet and > then immediately save PCollection back to parquet. Well, of course, if you read all fields (columns) then you don’t need column projection. Otherwise, it can give a quite significant performance boost, especially for large tables with many columns. > Regarding SDF translation, is it enabled by default? From Beam 2.30.0 release notes: "Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based Read transforms ([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))” — Alexey > I will check out ParquetIO splittable. Thanks! > > From: Alexey Romanenko <aromanenko....@gmail.com> > Date: Thursday, August 5, 2021 at 6:40 AM > To: Tao Li <t...@zillow.com> > Cc: "user@beam.apache.org" <user@beam.apache.org>, Andrew Pilloud > <apill...@google.com>, Ismaël Mejía <ieme...@gmail.com>, Kyle Weaver > <kcwea...@google.com>, Yuchu Cao <yuc...@trulia.com> > Subject: Re: Perf issue with Beam on spark (spark runner) > > It’s very likely that Spark SQL may have much better performance because of > SQL push-downs and avoiding additional ser/deser operations. > > In the same time, did you try to leverage "withProjection()” in ParquetIO and > project only the fields that you needed? > > Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])? > > Also, using SDF translation for Read on Spark Runner can cause performance > degradation as well (we noticed that in our experiments). Try to use non-SDF > read (if not yet) [2] > > > PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m > not sure if a recording is already available but you can find the slides here > [3] that can be helpful. > > > — > Alexey > > [1] https://issues.apache.org/jira/browse/BEAM-12070 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D&reserved=0> > [2] https://issues.apache.org/jira/browse/BEAM-10670 > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D&reserved=0> > [3] > https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing > > <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D&reserved=0> > > > >> On 5 Aug 2021, at 03:07, Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >> wrote: >> >> @Alexey Romanenko <mailto:aromanenko....@gmail.com> @Ismaël Mejía >> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you >> please take a look at this thread and confirm this jira covers the causes >> https://issues.apache.org/jira/browse/BEAM-12646 >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D&reserved=0> >> ? >> >> This perf issue is currently a blocker to me.. >> >> Thanks so much! >> >> From: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >> <user@beam.apache.org <mailto:user@beam.apache.org>> >> Date: Friday, July 30, 2021 at 3:53 PM >> To: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>>, >> "user@beam.apache.org <mailto:user@beam.apache.org>" <user@beam.apache.org >> <mailto:user@beam.apache.org>> >> Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu >> Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>> >> Subject: Re: Perf issue with Beam on spark (spark runner) >> >> Thanks everyone for your help. >> >> We actually did another round of perf comparison between Beam (on spark) and >> native spark, without any projection/filtering in the query (to rule out the >> “predicate pushdown” factor). >> >> The time spent on Beam with spark runner is still taking 3-5x period of time >> compared with native spark, and the cause >> ishttps://issues.apache.org/jira/browse/BEAM-12646 >> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D&reserved=0> >> according to the spark metrics. Spark runner is pretty much the bottleneck. >> >> <image001.png> >> >> From: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>> >> Date: Thursday, July 29, 2021 at 2:11 PM >> To: "user@beam.apache.org <mailto:user@beam.apache.org>" >> <user@beam.apache.org <mailto:user@beam.apache.org>> >> Cc: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>, Kyle Weaver >> <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao >> <yuc...@trulia.com <mailto:yuc...@trulia.com>> >> Subject: Re: Perf issue with Beam on spark (spark runner) >> >> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0. >> >> Andrew >> >> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <apill...@google.com >> <mailto:apill...@google.com>> wrote: >>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are >>> working to expand this to more IOs). Using ParquetIO withProjection >>> directly will produce better results. >>> >>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <rober...@google.com >>> <mailto:rober...@google.com>> wrote: >>>> Could you try using Beam SQL [1] and see if that gives more similar result >>>> to your Spark SQL query? I would also be curious if the performance is >>>> sufficient using withProjection to only read the auction, price, and >>>> bidder columns. >>>> >>>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Lkc5LggzRQGNZZqL2iDc3s2ffCaZ%2BCeojSmX1fSO5Us%3D&reserved=0> >>>> [2] >>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema- >>>> >>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=H0ReHZFalbX7nR3dtrywxSCNW0obeNo3V7mU0D5sSVw%3D&reserved=0> >>>> >>>> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <t...@zillow.com >>>> <mailto:t...@zillow.com>> wrote: >>>>> Thanks Robert for filing BEAM-12646 >>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=anUxcfDmqEthRMepq5MwIT%2BNGdtH1UYblregkzszL6U%3D&reserved=0>. >>>>> This perf issue is a blocker for us to adopt Beam. It would be great if >>>>> the community could conclude the root cause and share an ETA for the fix. >>>>> Thanks so much! >>>>> >>>>> >>>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> >>>>> Date: Wednesday, July 21, 2021 at 3:51 PM >>>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >>>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Kyle Weaver >>>>> <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao >>>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>> >>>>> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <t...@zillow.com >>>>> <mailto:t...@zillow.com>> wrote: >>>>>> @Robert Bradshaw <mailto:rober...@google.com> with Spark API, the code >>>>>> is actually much simple. We are just calling spark SQL API against a >>>>>> hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder >>>>>> FROM bid”) >>>>> >>>>> Good chance that this is pushing projection of those few fields up into >>>>> the read operator, which could be a dramatic savings. You could try doing >>>>> it manually in Beam, or use Beam's SQL that should do the same. >>>>> >>>>>> >>>>>> I think the “globally windowed GBK” optimization you are proposing is a >>>>>> good callout. >>>>> >>>>> Filed https://issues.apache.org/jira/browse/BEAM-12646 >>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001722646%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=FzOdDP7LL6eMj3Kk2k1J1F5rF64WHP55OOSme%2BqC96Q%3D&reserved=0> >>>>> to track. >>>>> >>>>>> >>>>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> >>>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>> Date: Wednesday, July 21, 2021 at 1:09 PM >>>>>> To: user <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>> Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, >>>>>> Yuchu Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>> >>>>>> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <t...@zillow.com >>>>>> <mailto:t...@zillow.com>> wrote: >>>>>>> Kyle, I don’t expect such a huge perf diff as well. To your question, >>>>>>> no I am not specifying withProjection or withSplit for parquet reader. >>>>>> >>>>>> Are you doing so in your Spark code? >>>>>> >>>>>>> Below is my parquet read code: >>>>>>> >>>>>>> PCollection<FileIO.ReadableFile> files = pipeline >>>>>>> .apply(FileIO.match().filepattern(beamRequiredPath)) >>>>>>> .apply(FileIO.readMatches()); >>>>>>> >>>>>>> PCollection<Row> table = files >>>>>>> .apply(ParquetIO >>>>>>> .readFiles(avroSchema) >>>>>>> >>>>>>> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, >>>>>>> "false"))) >>>>>>> .apply(MapElements >>>>>>> .into(TypeDescriptors.rows()) >>>>>>> >>>>>>> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema)))) >>>>>>> >>>>>>> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema))); >>>>>>> >>>>>>> >>>>>>> According to my investigation, looks like below call stack is very >>>>>>> computation intensive and causing a lot of GC time. And looks like the >>>>>>> stack comes from spark runner code. >>>>>> >>>>>> This does look inordinately expensive. I wonder if it would make sense >>>>>> to optimize the globally windowed GBK as some other runners do. >>>>>> >>>>>>> >>>>>>> <image001.png> >>>>>>> >>>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> >>>>>>> Date: Tuesday, July 20, 2021 at 3:57 PM >>>>>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>> >>>>>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Yuchu Cao >>>>>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>>> >>>>>>> Beam has its own implementation of Parquet IO, and doesn't use Spark's. >>>>>>> It's possible Spark's implementation does more optimizations, though >>>>>>> perhaps not enough to result in such a dramatic difference. >>>>>>> >>>>>>> I'm curious how your Parquet read is configured. In particular, if >>>>>>> withProjection or withSplit are set. >>>>>>> >>>>>>> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <t...@zillow.com >>>>>>> <mailto:t...@zillow.com>> wrote: >>>>>>>> Hi Kyle, >>>>>>>> >>>>>>>> The ParDo (which references the code I shared) is the only >>>>>>>> transformation in my pipeline. The input and output are parquet files >>>>>>>> in S3 (we are using beam ParquetIO). >>>>>>>> >>>>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>> >>>>>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>>>> Date: Tuesday, July 20, 2021 at 2:13 PM >>>>>>>> To: "user@beam.apache.org <mailto:user@beam.apache.org>" >>>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>> >>>>>>>> Cc: Yuchu Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>> >>>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner) >>>>>>>> >>>>>>>> The DoFn you shared is simple enough that it seems unlikely to be the >>>>>>>> performance bottleneck here. >>>>>>>> >>>>>>>> Can you share more information about your complete pipeline? What >>>>>>>> other transforms are there? What sources/sinks are you using? >>>>>>>> >>>>>>>> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <t...@zillow.com >>>>>>>> <mailto:t...@zillow.com>> wrote: >>>>>>>>> Hi Beam community, >>>>>>>>> >>>>>>>>> We are seeing a serious perf issue with beam using spark runner, >>>>>>>>> compared with writing a native spark app. Can you please provide some >>>>>>>>> help? >>>>>>>>> >>>>>>>>> The beam on spark app is taking 8-10 min, whereas a native spark is >>>>>>>>> only taking 2 min. Below is Spark UI, from which you can see the >>>>>>>>> flatMapToPair method is very time consuming. Is this method call >>>>>>>>> coming from spark runner? >>>>>>>>> >>>>>>>>> <image001.png> >>>>>>>>> >>>>>>>>> I suspect this is caused by high GC time. See “GC Time” column below: >>>>>>>>> >>>>>>>>> <image002.png> >>>>>>>>> >>>>>>>>> >>>>>>>>> The beam code is really simple, just a per row processing. >>>>>>>>> >>>>>>>>> public class CalcFn extends DoFn<Row, Row> { >>>>>>>>> protected Logger log = LoggerFactory.getLogger(this.getClass()); >>>>>>>>> private Schema schema; >>>>>>>>> >>>>>>>>> public CalcFn(Schema schema) { >>>>>>>>> this.schema = schema; >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> @ProcessElement >>>>>>>>> public void processElement(@Element Row row,OutputReceiver<Row> >>>>>>>>> receiver) { >>>>>>>>> // Row row = ctx.element(); >>>>>>>>> Long auction_value = (Long) row.getBaseValue("auction"); >>>>>>>>> Long bid_value = (Long) row.getBaseValue("bidder"); >>>>>>>>> Long price = (Long) row.getBaseValue("price"); >>>>>>>>> Double euro = price * 0.82; >>>>>>>>> >>>>>>>>> receiver.output( Row.withSchema(schema) >>>>>>>>> .addValues(auction_value, euro, bid_value).build()); >>>>>>>>> } >>>>>>>>> }