I used to maintain a HBase cluster. The experience with it was not happy.

I just tried query the data  from each day's first and dedup with smaller
set, the performance is acceptable.  So I guess I will use this method.

Again, could anyone give advice about:

   - Automatically determine the number of reducers for joins and groupbys:
   Currently in Spark SQL, you need to control the degree of parallelism
   post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

Thanks.

Gavin




On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. in an noSQL db such as Hbase
>
> +1 :-)
>
>
> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> One option you may want to explore is writing event table in an noSQL db
>> such as Hbase. One inherent problem in your approach is you always need to
>> load either full data set or a defined number of partitions to see if the
>> event has already come (and no gurantee it is full proof, but lead to
>> unnecessary loading in most cases).
>>
>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>> wrote:
>>
>>> Hey,
>>> Thank you for the answer. I checked the setting you mentioend they are
>>> all correct.  I noticed that in the job, there are always only 200 reducers
>>> for shuffle read, I believe it is setting in the sql shuffle parallism.
>>>
>>> In the doc, it mentions:
>>>
>>>    - Automatically determine the number of reducers for joins and
>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>    parallelism post-shuffle using “SET
>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>
>>>
>>> What would be the ideal number for this setting? Is it based on the
>>> hardware of cluster?
>>>
>>>
>>> Thanks,
>>>
>>> Gavin
>>>
>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>> wrote:
>>>
>>>>
>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>    - What spark version did you use? It seems at least 1.4. If you use
>>>>    spark-sql and tungsten, you might have better performance. but spark 
>>>> 1.5.2
>>>>    gave me a wrong result when the data was about 300~400GB, just for a 
>>>> simple
>>>>    group-by and aggregate.
>>>>    - Did you use kyro serialization?
>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>    - What about this:
>>>>       - Read the data day by day
>>>>       - compute a bucket id from timestamp, e.g., the date and hour
>>>>       - Write into different buckets (you probably need a special
>>>>       writer to write data efficiently without shuffling the data).
>>>>       - distinct for each bucket. Because each bucket is small, spark
>>>>       can get it done faster than having everything in one run.
>>>>       - I think using groupBy (userId, timestamp) might be better than
>>>>       distinct. I guess distinct() will compare every field.
>>>>
>>>>
>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> And the most frequent operation I am gonna do is find the UserID who
>>>>> have some events, then retrieve all the events associted with the UserID.
>>>>>
>>>>> In this case, how should I partition to speed up the process?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hey Ted,
>>>>>>
>>>>>> Event table is like this: UserID, EventType, EventKey, TimeStamp,
>>>>>> MetaData.  I just parse it from Json and save as Parquet, did not change
>>>>>> the partition.
>>>>>>
>>>>>> Annoyingly, every day's incoming Event data having duplicates among
>>>>>> each other.  One same event could show up in Day1 and Day2 and probably
>>>>>> Day3.
>>>>>>
>>>>>> I only want to keep single Event table and each day it come so many
>>>>>> duplicates.
>>>>>>
>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>> found, just ignore?
>>>>>>
>>>>>> Thanks,
>>>>>> Gavin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>>
>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>
>>>>>>> Can you dedup within partitions ?
>>>>>>>
>>>>>>> Cheers
>>>>>>>
>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I tried on Three day's data.  The total input is only 980GB, but
>>>>>>>> the shuffle write Data is about 6.2TB, then the job failed during 
>>>>>>>> shuffle
>>>>>>>> read step, which should be another 6.2TB shuffle read.
>>>>>>>>
>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>> anything I could do to stablize this process?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> I got everyday's Event table and want to merge them into a single
>>>>>>>>> Event table. But there so many duplicates among each day's data.
>>>>>>>>>
>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>
>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>> parquet file").
>>>>>>>>>
>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>
>>>>>>>>> But it failed at the stage2 which keeps losing connection to one
>>>>>>>>> executor. I guess this is due to the memory issue.
>>>>>>>>>
>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Gavin
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Reply via email to