I used to maintain a HBase cluster. The experience with it was not happy. I just tried query the data from each day's first and dedup with smaller set, the performance is acceptable. So I guess I will use this method.
Again, could anyone give advice about: - Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”. Thanks. Gavin On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote: > bq. in an noSQL db such as Hbase > > +1 :-) > > > On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote: > >> One option you may want to explore is writing event table in an noSQL db >> such as Hbase. One inherent problem in your approach is you always need to >> load either full data set or a defined number of partitions to see if the >> event has already come (and no gurantee it is full proof, but lead to >> unnecessary loading in most cases). >> >> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com> >> wrote: >> >>> Hey, >>> Thank you for the answer. I checked the setting you mentioend they are >>> all correct. I noticed that in the job, there are always only 200 reducers >>> for shuffle read, I believe it is setting in the sql shuffle parallism. >>> >>> In the doc, it mentions: >>> >>> - Automatically determine the number of reducers for joins and >>> groupbys: Currently in Spark SQL, you need to control the degree of >>> parallelism post-shuffle using “SET >>> spark.sql.shuffle.partitions=[num_tasks];”. >>> >>> >>> What would be the ideal number for this setting? Is it based on the >>> hardware of cluster? >>> >>> >>> Thanks, >>> >>> Gavin >>> >>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com> >>> wrote: >>> >>>> >>>> - I assume your parquet files are compressed. Gzip or Snappy? >>>> - What spark version did you use? It seems at least 1.4. If you use >>>> spark-sql and tungsten, you might have better performance. but spark >>>> 1.5.2 >>>> gave me a wrong result when the data was about 300~400GB, just for a >>>> simple >>>> group-by and aggregate. >>>> - Did you use kyro serialization? >>>> - you should have spark.shuffle.compress=true, verify it. >>>> - How many tasks did you use? spark.default.parallelism=? >>>> - What about this: >>>> - Read the data day by day >>>> - compute a bucket id from timestamp, e.g., the date and hour >>>> - Write into different buckets (you probably need a special >>>> writer to write data efficiently without shuffling the data). >>>> - distinct for each bucket. Because each bucket is small, spark >>>> can get it done faster than having everything in one run. >>>> - I think using groupBy (userId, timestamp) might be better than >>>> distinct. I guess distinct() will compare every field. >>>> >>>> >>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com> >>>> wrote: >>>> >>>>> And the most frequent operation I am gonna do is find the UserID who >>>>> have some events, then retrieve all the events associted with the UserID. >>>>> >>>>> In this case, how should I partition to speed up the process? >>>>> >>>>> Thanks. >>>>> >>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>> wrote: >>>>> >>>>>> hey Ted, >>>>>> >>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp, >>>>>> MetaData. I just parse it from Json and save as Parquet, did not change >>>>>> the partition. >>>>>> >>>>>> Annoyingly, every day's incoming Event data having duplicates among >>>>>> each other. One same event could show up in Day1 and Day2 and probably >>>>>> Day3. >>>>>> >>>>>> I only want to keep single Event table and each day it come so many >>>>>> duplicates. >>>>>> >>>>>> Is there a way I could just insert into Parquet and if duplicate >>>>>> found, just ignore? >>>>>> >>>>>> Thanks, >>>>>> Gavin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>>>> >>>>>>> Is your Parquet data source partitioned by date ? >>>>>>> >>>>>>> Can you dedup within partitions ? >>>>>>> >>>>>>> Cheers >>>>>>> >>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I tried on Three day's data. The total input is only 980GB, but >>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during >>>>>>>> shuffle >>>>>>>> read step, which should be another 6.2TB shuffle read. >>>>>>>> >>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there >>>>>>>> anything I could do to stablize this process? >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hey, >>>>>>>>> >>>>>>>>> I got everyday's Event table and want to merge them into a single >>>>>>>>> Event table. But there so many duplicates among each day's data. >>>>>>>>> >>>>>>>>> I use Parquet as the data source. What I am doing now is >>>>>>>>> >>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new >>>>>>>>> parquet file"). >>>>>>>>> >>>>>>>>> Each day's Event is stored in their own Parquet file >>>>>>>>> >>>>>>>>> But it failed at the stage2 which keeps losing connection to one >>>>>>>>> executor. I guess this is due to the memory issue. >>>>>>>>> >>>>>>>>> Any suggestion how I do this efficiently? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Gavin >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > >