> On 5 Aug 2021, at 18:17, Tao Li <t...@zillow.com> wrote:
> 
> It was a great presentation!
Thanks!

>  Regarding my perf testing, I was not doing aggregation, filtering, 
> projection or joining. I was simply reading all the fields of parquet and 
> then immediately save PCollection back to parquet.

Well, of course, if you read all fields (columns) then you don’t need column 
projection. Otherwise, it can give a quite significant performance boost, 
especially for large tables with many columns. 


> Regarding SDF translation, is it enabled by default?

From Beam 2.30.0 release notes:

"Legacy Read transform (non-SDF based Read) is used by default for non-FnAPI 
opensource runners. Use `use_sdf_read` experimental flag to re-enable SDF based 
Read transforms 
([BEAM-10670](https://issues.apache.org/jira/browse/BEAM-10670))”

—
Alexey

>  I will check out ParquetIO splittable. Thanks!
>  
> From: Alexey Romanenko <aromanenko....@gmail.com>
> Date: Thursday, August 5, 2021 at 6:40 AM
> To: Tao Li <t...@zillow.com>
> Cc: "user@beam.apache.org" <user@beam.apache.org>, Andrew Pilloud 
> <apill...@google.com>, Ismaël Mejía <ieme...@gmail.com>, Kyle Weaver 
> <kcwea...@google.com>, Yuchu Cao <yuc...@trulia.com>
> Subject: Re: Perf issue with Beam on spark (spark runner)
>  
> It’s very likely that Spark SQL may have much better performance because of 
> SQL push-downs and avoiding additional ser/deser operations.
>  
> In the same time, did you try to leverage "withProjection()” in ParquetIO and 
> project only the fields that you needed? 
>  
> Did you use ParquetIO splittable (it's not enabled by default, fixed in [1])?
>  
> Also, using SDF translation for Read on Spark Runner can cause performance 
> degradation as well (we noticed that in our experiments). Try to use non-SDF 
> read (if not yet) [2]
>  
>  
> PS: Yesterday, on Beam Summit, we (Ismael and me) gave a related talk. I’m 
> not sure if a recording is already available but you can find the slides here 
> [3] that can be helpful.
>  
>  
> —
> Alexey
>  
> [1] https://issues.apache.org/jira/browse/BEAM-12070 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12070&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Yq%2FODFNPo7XncHKExNDRBw6qRH2HSrymTcSGGRRWICs%3D&reserved=0>
> [2] https://issues.apache.org/jira/browse/BEAM-10670 
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-10670&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001682824%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ABQA4rB%2BeiMHIGdXQKiADS93F9%2F3bUfn4%2BCRRr4dgVI%3D&reserved=0>
> [3] 
> https://drive.google.com/file/d/17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O/view?usp=sharing
>  
> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdrive.google.com%2Ffile%2Fd%2F17rJC0BkxpFFL1abVL01c-D0oHvRRmQ-O%2Fview%3Fusp%3Dsharing&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=%2Fj0Qeibje5jk0Hiz9x57Pa92mRTyzvmTf63hOrNCPZ4%3D&reserved=0>
>  
> 
> 
>> On 5 Aug 2021, at 03:07, Tao Li <t...@zillow.com <mailto:t...@zillow.com>> 
>> wrote:
>>  
>> @Alexey Romanenko <mailto:aromanenko....@gmail.com> @Ismaël Mejía 
>> <mailto:ieme...@gmail.com> I assume you are experts on spark runner. Can you 
>> please take a look at this thread and confirm this jira covers the causes 
>> https://issues.apache.org/jira/browse/BEAM-12646 
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001692781%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=c23T9dKc0muC7sRWrsAYrewA4QKAUSc6tOAwe9kRfC4%3D&reserved=0>
>>  ?
>>  
>> This perf issue is currently a blocker to me..
>>  
>> Thanks so much!
>>  
>> From: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>
>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>> <user@beam.apache.org <mailto:user@beam.apache.org>>
>> Date: Friday, July 30, 2021 at 3:53 PM
>> To: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>>, 
>> "user@beam.apache.org <mailto:user@beam.apache.org>" <user@beam.apache.org 
>> <mailto:user@beam.apache.org>>
>> Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu 
>> Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>  
>> Thanks everyone for your help.
>>  
>> We actually did another round of perf comparison between Beam (on spark) and 
>> native spark, without any projection/filtering in the query (to rule out the 
>> “predicate pushdown” factor).
>>  
>> The time spent on Beam with spark runner is still taking 3-5x period of time 
>> compared with native spark, and the cause 
>> ishttps://issues.apache.org/jira/browse/BEAM-12646 
>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=LXb2NFUuF3BKkUX6m6rAdMJ%2B04e8WjxPNcDVn4zibl8%3D&reserved=0>
>>  according to the spark metrics. Spark runner is pretty much the bottleneck.
>>  
>> <image001.png>
>>  
>> From: Andrew Pilloud <apill...@google.com <mailto:apill...@google.com>>
>> Date: Thursday, July 29, 2021 at 2:11 PM
>> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>> <user@beam.apache.org <mailto:user@beam.apache.org>>
>> Cc: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>, Kyle Weaver 
>> <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao 
>> <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>  
>> Actually, ParquetIO got pushdown in Beam SQL starting at v2.29.0.
>>  
>> Andrew
>>  
>> On Mon, Jul 26, 2021 at 10:05 AM Andrew Pilloud <apill...@google.com 
>> <mailto:apill...@google.com>> wrote:
>>> Beam SQL doesn't currently have project pushdown for ParquetIO (we are 
>>> working to expand this to more IOs). Using ParquetIO withProjection 
>>> directly will produce better results.
>>>  
>>> On Mon, Jul 26, 2021 at 9:46 AM Robert Bradshaw <rober...@google.com 
>>> <mailto:rober...@google.com>> wrote:
>>>> Could you try using Beam SQL [1] and see if that gives more similar result 
>>>> to your Spark SQL query? I would also be curious if the performance is 
>>>> sufficient using withProjection to only read the auction, price, and 
>>>> bidder columns. 
>>>>  
>>>> [1] https://beam.apache.org/documentation/dsls/sql/overview/ 
>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Fdocumentation%2Fdsls%2Fsql%2Foverview%2F&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001702736%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=Lkc5LggzRQGNZZqL2iDc3s2ffCaZ%2BCeojSmX1fSO5Us%3D&reserved=0>
>>>> [2] 
>>>> https://beam.apache.org/releases/javadoc/2.25.0/org/apache/beam/sdk/io/parquet/ParquetIO.Read.html#withProjection-org.apache.avro.Schema-org.apache.avro.Schema-
>>>>  
>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.25.0%2Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2Fparquet%2FParquetIO.Read.html%23withProjection-org.apache.avro.Schema-org.apache.avro.Schema-&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=H0ReHZFalbX7nR3dtrywxSCNW0obeNo3V7mU0D5sSVw%3D&reserved=0>
>>>>  
>>>> On Sat, Jul 24, 2021 at 10:23 AM Tao Li <t...@zillow.com 
>>>> <mailto:t...@zillow.com>> wrote:
>>>>> Thanks Robert for filing BEAM-12646 
>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001712693%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=anUxcfDmqEthRMepq5MwIT%2BNGdtH1UYblregkzszL6U%3D&reserved=0>.
>>>>>  This perf issue is a blocker for us to adopt Beam. It would be great if 
>>>>> the community could conclude the root cause and share an ETA for the fix. 
>>>>> Thanks so much!  
>>>>>  
>>>>>  
>>>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>>
>>>>> Date: Wednesday, July 21, 2021 at 3:51 PM
>>>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>
>>>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Kyle Weaver 
>>>>> <kcwea...@google.com <mailto:kcwea...@google.com>>, Yuchu Cao 
>>>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>>>>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>>>>  
>>>>> On Wed, Jul 21, 2021 at 3:00 PM Tao Li <t...@zillow.com 
>>>>> <mailto:t...@zillow.com>> wrote:
>>>>>> @Robert Bradshaw <mailto:rober...@google.com> with Spark API, the code 
>>>>>> is actually much simple. We are just calling spark SQL API against a 
>>>>>> hive table: spark.sql(“SELECT auction, 0.82*(price) as euro, bidder  
>>>>>> FROM bid”)
>>>>>  
>>>>> Good chance that this is pushing projection of those few fields up into 
>>>>> the read operator, which could be a dramatic savings. You could try doing 
>>>>> it manually in Beam, or use Beam's SQL that should do the same. 
>>>>>  
>>>>>>  
>>>>>> I think the “globally windowed GBK” optimization you are proposing is a 
>>>>>> good callout.
>>>>>  
>>>>> Filed https://issues.apache.org/jira/browse/BEAM-12646 
>>>>> <https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FBEAM-12646&data=04%7C01%7Ctaol%40zillow.com%7Cc36172d0b4894ac802b708d958168457%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637637676001722646%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=FzOdDP7LL6eMj3Kk2k1J1F5rF64WHP55OOSme%2BqC96Q%3D&reserved=0>
>>>>>  to track. 
>>>>>  
>>>>>>  
>>>>>> From: Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>>
>>>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>
>>>>>> Date: Wednesday, July 21, 2021 at 1:09 PM
>>>>>> To: user <user@beam.apache.org <mailto:user@beam.apache.org>>
>>>>>> Cc: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>, 
>>>>>> Yuchu Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>>>>>  
>>>>>> On Wed, Jul 21, 2021 at 12:51 PM Tao Li <t...@zillow.com 
>>>>>> <mailto:t...@zillow.com>> wrote:
>>>>>>> Kyle, I don’t expect such a huge perf diff as well. To your question, 
>>>>>>> no I am not specifying withProjection or withSplit for parquet reader.
>>>>>>  
>>>>>> Are you doing so in your Spark code? 
>>>>>>  
>>>>>>> Below is my parquet read code:
>>>>>>>  
>>>>>>> PCollection<FileIO.ReadableFile> files = pipeline
>>>>>>>                 .apply(FileIO.match().filepattern(beamRequiredPath))
>>>>>>>                 .apply(FileIO.readMatches());
>>>>>>>  
>>>>>>> PCollection<Row> table = files
>>>>>>>                 .apply(ParquetIO
>>>>>>>                         .readFiles(avroSchema)
>>>>>>>                         
>>>>>>> .withConfiguration(ImmutableMap.of(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
>>>>>>>  "false")))
>>>>>>>                 .apply(MapElements
>>>>>>>                         .into(TypeDescriptors.rows())
>>>>>>>                         
>>>>>>> .via(AvroUtils.getGenericRecordToRowFunction(AvroUtils.toBeamSchema(avroSchema))))
>>>>>>>                 
>>>>>>> .setCoder(RowCoder.of(AvroUtils.toBeamSchema(avroSchema)));
>>>>>>>  
>>>>>>>  
>>>>>>> According to my investigation, looks like below call stack is very 
>>>>>>> computation intensive and causing a lot of GC time. And looks like the 
>>>>>>> stack comes from spark runner code.
>>>>>>  
>>>>>> This does look inordinately expensive. I wonder if it would make sense 
>>>>>> to optimize the globally windowed GBK as some other runners do. 
>>>>>>  
>>>>>>>  
>>>>>>> <image001.png>
>>>>>>>  
>>>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>
>>>>>>> Date: Tuesday, July 20, 2021 at 3:57 PM
>>>>>>> To: Tao Li <t...@zillow.com <mailto:t...@zillow.com>>
>>>>>>> Cc: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>, Yuchu Cao 
>>>>>>> <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>>>>>>  
>>>>>>> Beam has its own implementation of Parquet IO, and doesn't use Spark's. 
>>>>>>> It's possible Spark's implementation does more optimizations, though 
>>>>>>> perhaps not enough to result in such a dramatic difference.
>>>>>>>  
>>>>>>> I'm curious how your Parquet read is configured. In particular, if 
>>>>>>> withProjection or withSplit are set.
>>>>>>>  
>>>>>>> On Tue, Jul 20, 2021 at 3:21 PM Tao Li <t...@zillow.com 
>>>>>>> <mailto:t...@zillow.com>> wrote:
>>>>>>>> Hi Kyle,
>>>>>>>>  
>>>>>>>> The ParDo (which references the code I shared) is the only 
>>>>>>>> transformation in my pipeline. The input and output are parquet files 
>>>>>>>> in S3 (we are using beam ParquetIO).
>>>>>>>>  
>>>>>>>> From: Kyle Weaver <kcwea...@google.com <mailto:kcwea...@google.com>>
>>>>>>>> Reply-To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>>>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>
>>>>>>>> Date: Tuesday, July 20, 2021 at 2:13 PM
>>>>>>>> To: "user@beam.apache.org <mailto:user@beam.apache.org>" 
>>>>>>>> <user@beam.apache.org <mailto:user@beam.apache.org>>
>>>>>>>> Cc: Yuchu Cao <yuc...@trulia.com <mailto:yuc...@trulia.com>>
>>>>>>>> Subject: Re: Perf issue with Beam on spark (spark runner)
>>>>>>>>  
>>>>>>>> The DoFn you shared is simple enough that it seems unlikely to be the 
>>>>>>>> performance bottleneck here.
>>>>>>>>  
>>>>>>>> Can you share more information about your complete pipeline? What 
>>>>>>>> other transforms are there? What sources/sinks are you using?
>>>>>>>>  
>>>>>>>> On Tue, Jul 20, 2021 at 2:02 PM Tao Li <t...@zillow.com 
>>>>>>>> <mailto:t...@zillow.com>> wrote:
>>>>>>>>> Hi Beam community,
>>>>>>>>>  
>>>>>>>>> We are seeing a serious perf issue with beam using spark runner, 
>>>>>>>>> compared with writing a native spark app. Can you please provide some 
>>>>>>>>> help?
>>>>>>>>>  
>>>>>>>>> The beam on spark app is taking 8-10 min, whereas a native spark is 
>>>>>>>>> only taking 2 min. Below is Spark UI, from which you can see the 
>>>>>>>>> flatMapToPair method is very time consuming. Is this method call 
>>>>>>>>> coming from spark runner?
>>>>>>>>>  
>>>>>>>>> <image001.png>
>>>>>>>>>  
>>>>>>>>> I suspect this is caused by high GC time. See “GC Time” column below:
>>>>>>>>>  
>>>>>>>>> <image002.png>
>>>>>>>>>  
>>>>>>>>>  
>>>>>>>>> The beam code is really simple, just a per row processing.
>>>>>>>>>  
>>>>>>>>> public class CalcFn extends DoFn<Row, Row> {
>>>>>>>>>     protected Logger log = LoggerFactory.getLogger(this.getClass());
>>>>>>>>>     private Schema schema;
>>>>>>>>>  
>>>>>>>>>     public CalcFn(Schema schema) {
>>>>>>>>>         this.schema = schema;
>>>>>>>>>  
>>>>>>>>>  
>>>>>>>>>  
>>>>>>>>>     }
>>>>>>>>>  
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void processElement(@Element Row row,OutputReceiver<Row> 
>>>>>>>>> receiver) {
>>>>>>>>>        // Row row = ctx.element();
>>>>>>>>>         Long auction_value = (Long) row.getBaseValue("auction");
>>>>>>>>>         Long bid_value = (Long) row.getBaseValue("bidder");
>>>>>>>>>         Long price = (Long) row.getBaseValue("price");
>>>>>>>>>         Double euro = price * 0.82;
>>>>>>>>>  
>>>>>>>>>         receiver.output( Row.withSchema(schema)
>>>>>>>>>                 .addValues(auction_value, euro, bid_value).build());
>>>>>>>>>     }
>>>>>>>>> }

Reply via email to