Hi Alexey,

It was a great presentation!

Regarding my perf testing, I was not doing aggregation, filtering, projection 
or joining. I was simply reading all the fields of parquet and then immediately 
save PCollection back to parquet.

Regarding SDF translation, is it enabled by default?

I will check out ParquetIO splittable. Thanks!

From: Alexey Romanenko <aromanenko....@gmail.com>
Date: Thursday, August 5, 2021 at 6:40 AM
To: Tao Li <t...@zillow.com>
Cc: "user@beam.apache.org" <user@beam.apache.org>, Andrew Pilloud 
<apill...@google.com>, Ismaël Mejía <ieme...@gmail.com>, Kyle Weaver 
<kcwea...@google.com>, Yuchu Cao <yuc...@trulia.com>
Subject: Re: Perf issue with Beam on spark (spark runner)

It’s very likely that Spark SQL may have much better performance because of SQL 
push-downs and avoiding additional ser/deser operations.

In the same time, did you try to leverage "withProjection()” in ParquetIO and 
project only the fields that you needed?

Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?

Also, using SDF translation for Read on Spark Runner can cause performance 
degradation as well (we noticed that in our experiments). Try to use non-SDF 
read (if not yet) [2]


PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m not 
sure if a recording is already available but you can find the slides here [3] 
that can be helpful.


—
Alexey

[1] 
https://issues.apache.org/jira/browse/BEAM-12070<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D&reserved=0>
[2] 
https://issues.apache.org/jira/browse/BEAM-10670<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D&reserved=0>
[3] 
https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D&reserved=0>



On 5 Aug 2021, at 03:07, Tao Li <t...@zillow.com<mailto:t...@zillow.com>> wrote:

@Alexey Romanenko<mailto:aromanenko....@gmail.com> @Ismaël 
Mejía<mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can 
you please take a look at this thread and confirm this jira covers the causes 
https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D&reserved=0>
 ?

This perf issue is currently a blocker to me..

Thanks so much!

From: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Friday, July 30, 2021 at 3:53 PM
To: Andrew Pilloud <apill...@google.com<mailto:apill...@google.com>>, 
"user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Cc: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>>, Yuchu Cao 
<yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Thanks everyone for your help.

We actually did another round of perf comparison between Beam (on spark) and 
native spark, without any projection/filtering in the query (to rule out the 
“predicate pushdown” factor).

The time spent on Beam with spark runner is still taking 3-5x period of time 
compared with native spark, and the cause 
ishttps://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D&reserved=0>
 according to the spark metrics. Spark runner is pretty much the bottleneck.

<image001.png>

From: Andrew Pilloud <apill...@google.com<mailto:apill...@google.com>>
Date: Thursday, July 29, 2021 at 2:11 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Cc: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>, Kyle Weaver 
<kcwea...@google.com<mailto:kcwea...@google.com>>, Yuchu Cao 
<yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.

Andrew

On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud 
<apill...@google.com<mailto:apill...@google.com>> wrote:
Beam SQL doesn't currently have project pushdown for ParquetIO (we are working 
to expand this to more IOs). Using ParquetIO withProjection directly will 
produce better results.

On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw 
<rober...@google.com<mailto:rober...@google.com>> wrote:
Could you try using Beam SQL [1] and see if that gives more similar result to 
your Spark SQL query? I would also be curious if the performance is sufficient 
using withProjection to only read the auction, price, and bidder columns.

[1] 
https://beam.apache.org/documentation/dsls/sql/overview/<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Lkc5LggzRQGNZZqL2iDc3s2ffCaZ%2BCeojSmX1fSO5Us%3D&reserved=0>
[2] 
https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=H0ReHZFalbX7nR3dtrywxSCNW0obeNo3V7mU0D5sSVw%3D&reserved=0>

On Sat, Jul 24, 2021 at 10:23 AM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Thanks Robert for filing 
BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=anUxcfDmqEthRMepq5MwIT%2BNGdtH1UYblregkzszL6U%3D&reserved=0>.
 This perf issue is a blocker for us to adopt Beam. It would be great if the 
community could conclude the root cause and share an ETA for the fix. Thanks so 
much!


From: Robert Bradshaw <rober...@google.com<mailto:rober...@google.com>>
Date: Wednesday, July 21, 2021 at 3:51 PM
To: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Cc: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>, Kyle Weaver 
<kcwea...@google.com<mailto:kcwea...@google.com>>, Yuchu Cao 
<yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

On Wed, Jul 21, 2021 at 3:00 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
@Robert Bradshaw<mailto:rober...@google.com> with Spark API, the code is 
actually much simple. We are just calling spark SQL API against a hive table: 
spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  FROM bid”)

Good chance that this is pushing projection of those few fields up into the 
read operator, which could be a dramatic savings. You could try doing it 
manually in Beam, or use Beam's SQL that should do the same.


I think the “globally windowed GBK” optimization you are proposing is a good 
callout.

Filed 
https://issues.apache.org/jira/browse/BEAM-12646<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001722646%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=FzOdDP7LL6eMj3Kk2k1J1F5rF64WHP55OOSme%2BqC96Q%3D&reserved=0>
 to track.


From: Robert Bradshaw <rober...@google.com<mailto:rober...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Wednesday, July 21, 2021 at 1:09 PM
To: user <user@beam.apache.org<mailto:user@beam.apache.org>>
Cc: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>>, Yuchu Cao 
<yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

On Wed, Jul 21, 2021 at 12:51 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Kyle, I don’t expect such a huge perf diff as well. To your question, no I am 
not specifying withProjection or withSplit for parquet reader.

Are you doing so in your Spark code?

Below is my parquet read code:

PCollection<FileIO.ReadableFile> files = pipeline
                .apply(FileIO.match().filepattern(beamRequiredPath))
                .apply(FileIO.readMatches());

PCollection<Row> table = files
                .apply(ParquetIO
                        .readFiles(avroSchema)
                        
.withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
 "false")))
                .apply(MapElements
                        .into(TypeDescriptors.rows())
                        
.via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
                .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));


According to my investigation, looks like below call stack is very computation 
intensive and causing a lot of GC time. And looks like the stack comes from 
spark runner code.

This does look inordinately expensive. I wonder if it would make sense to 
optimize the globally windowed GBK as some other runners do.


<image001.png>

From: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>>
Date: Tuesday, July 20, 2021 at 3:57 PM
To: Tao Li <t...@zillow.com<mailto:t...@zillow.com>>
Cc: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>, Yuchu Cao 
<yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

Beam has its own implementation of Parquet IO, and doesn't use Spark's. It's 
possible Spark's implementation does more optimizations, though perhaps not 
enough to result in such a dramatic difference.

I'm curious how your Parquet read is configured. In particular, if 
withProjection or withSplit are set.

On Tue, Jul 20, 2021 at 3:21 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Hi Kyle,

The ParDo (which references the code I shared) is the only transformation in my 
pipeline. The input and output are parquet files in S3 (we are using beam 
ParquetIO).

From: Kyle Weaver <kcwea...@google.com<mailto:kcwea...@google.com>>
Reply-To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Date: Tuesday, July 20, 2021 at 2:13 PM
To: "user@beam.apache.org<mailto:user@beam.apache.org>" 
<user@beam.apache.org<mailto:user@beam.apache.org>>
Cc: Yuchu Cao <yuc...@trulia.com<mailto:yuc...@trulia.com>>
Subject: Re: Perf issue with Beam on spark (spark runner)

The DoFn you shared is simple enough that it seems unlikely to be the 
performance bottleneck here.

Can you share more information about your complete pipeline? What other 
transforms are there? What sources/sinks are you using?

On Tue, Jul 20, 2021 at 2:02 PM Tao Li 
<t...@zillow.com<mailto:t...@zillow.com>> wrote:
Hi Beam community,

We are seeing a serious perf issue with beam using spark runner, compared with 
writing a native spark app. Can you please provide some help?

The beam on spark app is taking 8-10 min, whereas a native spark is only taking 
2 min. Below is Spark UI, from which you can see the flatMapToPair method is 
very time consuming. Is this method call coming from spark runner?

<image001.png>

I suspect this is caused by high GC time. See “GC Time” column below:

<image002.png>


The beam code is really simple, just a per row processing.

public class CalcFn extends DoFn<Row, Row> {
    protected Logger log = LoggerFactory.getLogger(this.getClass());
    private Schema schema;

    public CalcFn(Schema schema) {
        this.schema = schema;



    }

    @ProcessElement
    public void processElement(@Element Row row,OutputReceiver<Row> receiver) {
       // Row row = ctx.element();
        Long auction_value = (Long) row.getBaseValue("auction");
        Long bid_value = (Long) row.getBaseValue("bidder");
        Long price = (Long) row.getBaseValue("price");
        Double euro = price * 0.82;

        receiver.output( Row.withSchema(schema)
                .addValues(auction_value, euro, bid_value).build());
    }
}


Reply via email to