[ 
https://issues.apache.org/jira/browse/PIG-5029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489116#comment-15489116
 ] 

liyunzhang_intel edited comment on PIG-5029 at 9/14/16 2:18 AM:
----------------------------------------------------------------

[~rohini] and [~knoguchi]:  Thanks for interest in this skewed key sort problem.
[~rohini]:
bq. Pig has always handled skew automatically during order by.
Yes, in mr mode, pig will sample, partition and sort. In spark mode, just use 
RDD.sortByKey to implement sort feature. Although spark will sample the data 
automatically before sort but in the benchmark test, the effect is bad( a 
skewed key will be in 1 partition thus cause one part-xxxx is large while 
others are small)
bq.Using Random can cause data loss and duplication during reruns and should be 
avoided at all costs. We have been bitten by this a lot of times.
This is the only solution now i can get and shows a good performance 
improvement. Can you *explain* more about why this cause data loss and 
duplication?  
Now i'm investigating to use 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner
 which make the skewed key distributed evenly in spark mode.



was (Author: kellyzly):
[~rohini] and [~knoguchi]:  Thanks for interest in this skewed key sort problem.
[~rohini]:
bq. Pig has always handled skew automatically during order by.
Yes, in mr mode, pig will sample, partition and sort. In spark mode, just use 
RDD.sortByKey to implement sort feature. Although spark will sample the data 
automatically before sort but in the benchmark test, the effect is bad( a 
skewed key will be in 1 partition thus cause one part-xxxx is large while 
others are small)
bq.Using Random can cause data loss and duplication during reruns and should be 
avoided at all costs. We have been bitten by this a lot of times.
This is the only solution now i can get and shows a good performance 
improvement. Can you explain more about why this cause data loss and 
duplication?  
Now i'm investigating to use 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner
 which make the skewed key distributed evenly in spark mode.


> Optimize sort case when data is skewed
> --------------------------------------
>
>                 Key: PIG-5029
>                 URL: https://issues.apache.org/jira/browse/PIG-5029
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: liyunzhang_intel
>            Assignee: liyunzhang_intel
>             Fix For: spark-branch
>
>         Attachments: PIG-5029.patch, SkewedData_L9.docx
>
>
> In PigMix L9.pig
> {code}
> register $PIGMIX_JAR
> A = load '$HDFS_ROOT/page_views' using 
> org.apache.pig.test.pigmix.udf.PigPerformanceLoader()
>     as (user, action, timespent, query_term, ip_addr, timestamp,
>         estimated_revenue, page_info, page_links);
> B = order A by query_term parallel $PARALLEL;
> store B into '$PIGMIX_OUTPUT/L9out';
> {code}
> The pig physical plan will be changed to spark plan and to spark lineage:
> {code}
> [main] 2016-09-08 01:49:09,844 DEBUG converter.StoreConverter 
> (StoreConverter.java:convert(110)) - RDD lineage: (23) MapPartitionsRDD[8] at 
> map at StoreConverter.java:80 []
>  |   MapPartitionsRDD[7] at mapPartitions at SortConverter.java:58 []
>  |   ShuffledRDD[6] at sortByKey at SortConverter.java:56 []
>  +-(23) MapPartitionsRDD[3] at map at SortConverter.java:49 []
>     |   MapPartitionsRDD[2] at mapPartitions at ForEachConverter.java:64 []
>     |   MapPartitionsRDD[1] at map at LoadConverter.java:127 []
>     |   NewHadoopRDD[0] at newAPIHadoopRDD at LoadConverter.java:102 []
> {code}
> We use 
> [sortByKey|https://github.com/apache/pig/blob/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java#L56]
>  to implement the sort feature. Although 
> [RangePartitioner|https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/core/src/main/scala/org/apache/spark/Partitioner.scala#L106]
>  is used by RDD.sortByKey and RangePartitiner will sample data and ranges the 
> key roughly into equal range, the test result(attached  document) shows that 
> one partition will load most keys and take long time to finish.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to