So I tried to set the parquet compression codec to lzo, but hadoop does not have the lzo natives, while lz4 does included. But I could set the code to lz4, it only accepts lzo.
Any solution here? Thank, Gavin On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yue.yuany...@gmail.com> wrote: > I saw in the document, the value is LZO. Is it LZO or LZ4? > > https://github.com/Cyan4973/lz4 > > Based on this benchmark, they differ quite a lot. > > > > On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> gzip is relatively slow. It consumes much CPU. >> >> snappy is faster. >> >> LZ4 is faster than GZIP and smaller than Snappy. >> >> Cheers >> >> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote: >> >>> Thank you . >>> >>> And speaking of compression, is there big difference on performance >>> between gzip and snappy? And why parquet is using gzip by default? >>> >>> Thanks. >>> >>> >>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>> >>>> Cycling old bits: >>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ >>>> >>>> Gavin: >>>> Which release of hbase did you play with ? >>>> >>>> HBase has been evolving and is getting more stable. >>>> >>>> Cheers >>>> >>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> I used to maintain a HBase cluster. The experience with it was not >>>>> happy. >>>>> >>>>> I just tried query the data from each day's first and dedup with >>>>> smaller set, the performance is acceptable. So I guess I will use this >>>>> method. >>>>> >>>>> Again, could anyone give advice about: >>>>> >>>>> - Automatically determine the number of reducers for joins and >>>>> groupbys: Currently in Spark SQL, you need to control the degree of >>>>> parallelism post-shuffle using “SET >>>>> spark.sql.shuffle.partitions=[num_tasks];”. >>>>> >>>>> Thanks. >>>>> >>>>> Gavin >>>>> >>>>> >>>>> >>>>> >>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>> >>>>>> bq. in an noSQL db such as Hbase >>>>>> >>>>>> +1 :-) >>>>>> >>>>>> >>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> One option you may want to explore is writing event table in an >>>>>>> noSQL db such as Hbase. One inherent problem in your approach is you >>>>>>> always >>>>>>> need to load either full data set or a defined number of partitions to >>>>>>> see >>>>>>> if the event has already come (and no gurantee it is full proof, but >>>>>>> lead >>>>>>> to unnecessary loading in most cases). >>>>>>> >>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hey, >>>>>>>> Thank you for the answer. I checked the setting you mentioend they >>>>>>>> are all correct. I noticed that in the job, there are always only 200 >>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle >>>>>>>> parallism. >>>>>>>> >>>>>>>> In the doc, it mentions: >>>>>>>> >>>>>>>> - Automatically determine the number of reducers for joins and >>>>>>>> groupbys: Currently in Spark SQL, you need to control the degree of >>>>>>>> parallelism post-shuffle using “SET >>>>>>>> spark.sql.shuffle.partitions=[num_tasks];”. >>>>>>>> >>>>>>>> >>>>>>>> What would be the ideal number for this setting? Is it based on the >>>>>>>> hardware of cluster? >>>>>>>> >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Gavin >>>>>>>> >>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> - I assume your parquet files are compressed. Gzip or Snappy? >>>>>>>>> - What spark version did you use? It seems at least 1.4. If >>>>>>>>> you use spark-sql and tungsten, you might have better performance. >>>>>>>>> but >>>>>>>>> spark 1.5.2 gave me a wrong result when the data was about >>>>>>>>> 300~400GB, just >>>>>>>>> for a simple group-by and aggregate. >>>>>>>>> - Did you use kyro serialization? >>>>>>>>> - you should have spark.shuffle.compress=true, verify it. >>>>>>>>> - How many tasks did you use? spark.default.parallelism=? >>>>>>>>> - What about this: >>>>>>>>> - Read the data day by day >>>>>>>>> - compute a bucket id from timestamp, e.g., the date and >>>>>>>>> hour >>>>>>>>> - Write into different buckets (you probably need a special >>>>>>>>> writer to write data efficiently without shuffling the data). >>>>>>>>> - distinct for each bucket. Because each bucket is small, >>>>>>>>> spark can get it done faster than having everything in one run. >>>>>>>>> - I think using groupBy (userId, timestamp) might be better >>>>>>>>> than distinct. I guess distinct() will compare every field. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> And the most frequent operation I am gonna do is find the UserID >>>>>>>>>> who have some events, then retrieve all the events associted with the >>>>>>>>>> UserID. >>>>>>>>>> >>>>>>>>>> In this case, how should I partition to speed up the process? >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> hey Ted, >>>>>>>>>>> >>>>>>>>>>> Event table is like this: UserID, EventType, EventKey, >>>>>>>>>>> TimeStamp, MetaData. I just parse it from Json and save as >>>>>>>>>>> Parquet, did >>>>>>>>>>> not change the partition. >>>>>>>>>>> >>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates >>>>>>>>>>> among each other. One same event could show up in Day1 and Day2 and >>>>>>>>>>> probably Day3. >>>>>>>>>>> >>>>>>>>>>> I only want to keep single Event table and each day it come so >>>>>>>>>>> many duplicates. >>>>>>>>>>> >>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate >>>>>>>>>>> found, just ignore? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Gavin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Is your Parquet data source partitioned by date ? >>>>>>>>>>>> >>>>>>>>>>>> Can you dedup within partitions ? >>>>>>>>>>>> >>>>>>>>>>>> Cheers >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue < >>>>>>>>>>>> yue.yuany...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I tried on Three day's data. The total input is only 980GB, >>>>>>>>>>>>> but the shuffle write Data is about 6.2TB, then the job failed >>>>>>>>>>>>> during >>>>>>>>>>>>> shuffle read step, which should be another 6.2TB shuffle read. >>>>>>>>>>>>> >>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there >>>>>>>>>>>>> anything I could do to stablize this process? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue < >>>>>>>>>>>>> yue.yuany...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hey, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a >>>>>>>>>>>>>> single Event table. But there so many duplicates among each >>>>>>>>>>>>>> day's data. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I use Parquet as the data source. What I am doing now is >>>>>>>>>>>>>> >>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new >>>>>>>>>>>>>> parquet file"). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file >>>>>>>>>>>>>> >>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to >>>>>>>>>>>>>> one executor. I guess this is due to the memory issue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any suggestion how I do this efficiently? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> Gavin >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards, >>>>>>> Ayan Guha >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >