So I tried to set the parquet compression codec to lzo, but hadoop does not
have the lzo natives, while lz4 does included.
But I could set the code to lz4, it only accepts lzo.

Any solution here?

Thank,
Gavin



On Sat, Jan 9, 2016 at 12:09 AM, Gavin Yue <yue.yuany...@gmail.com> wrote:

> I saw in the document, the value is LZO.    Is it LZO or LZ4?
>
> https://github.com/Cyan4973/lz4
>
> Based on this benchmark, they differ quite a lot.
>
>
>
> On Fri, Jan 8, 2016 at 9:55 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> gzip is relatively slow. It consumes much CPU.
>>
>> snappy is faster.
>>
>> LZ4 is faster than GZIP and smaller than Snappy.
>>
>> Cheers
>>
>> On Fri, Jan 8, 2016 at 7:56 PM, Gavin Yue <yue.yuany...@gmail.com> wrote:
>>
>>> Thank you .
>>>
>>> And speaking of compression, is there big difference on performance
>>> between gzip and snappy? And why parquet is using gzip by default?
>>>
>>> Thanks.
>>>
>>>
>>> On Fri, Jan 8, 2016 at 6:39 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> Cycling old bits:
>>>> http://search-hadoop.com/m/q3RTtRuvrm1CGzBJ
>>>>
>>>> Gavin:
>>>> Which release of hbase did you play with ?
>>>>
>>>> HBase has been evolving and is getting more stable.
>>>>
>>>> Cheers
>>>>
>>>> On Fri, Jan 8, 2016 at 6:29 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>> wrote:
>>>>
>>>>> I used to maintain a HBase cluster. The experience with it was not
>>>>> happy.
>>>>>
>>>>> I just tried query the data  from each day's first and dedup with
>>>>> smaller set, the performance is acceptable.  So I guess I will use this
>>>>> method.
>>>>>
>>>>> Again, could anyone give advice about:
>>>>>
>>>>>    - Automatically determine the number of reducers for joins and
>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>    parallelism post-shuffle using “SET
>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Gavin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jan 8, 2016 at 6:25 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>>
>>>>>> bq. in an noSQL db such as Hbase
>>>>>>
>>>>>> +1 :-)
>>>>>>
>>>>>>
>>>>>> On Fri, Jan 8, 2016 at 6:25 PM, ayan guha <guha.a...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> One option you may want to explore is writing event table in an
>>>>>>> noSQL db such as Hbase. One inherent problem in your approach is you 
>>>>>>> always
>>>>>>> need to load either full data set or a defined number of partitions to 
>>>>>>> see
>>>>>>> if the event has already come (and no gurantee it is full proof, but 
>>>>>>> lead
>>>>>>> to unnecessary loading in most cases).
>>>>>>>
>>>>>>> On Sat, Jan 9, 2016 at 12:56 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hey,
>>>>>>>> Thank you for the answer. I checked the setting you mentioend they
>>>>>>>> are all correct.  I noticed that in the job, there are always only 200
>>>>>>>> reducers for shuffle read, I believe it is setting in the sql shuffle
>>>>>>>> parallism.
>>>>>>>>
>>>>>>>> In the doc, it mentions:
>>>>>>>>
>>>>>>>>    - Automatically determine the number of reducers for joins and
>>>>>>>>    groupbys: Currently in Spark SQL, you need to control the degree of
>>>>>>>>    parallelism post-shuffle using “SET
>>>>>>>>    spark.sql.shuffle.partitions=[num_tasks];”.
>>>>>>>>
>>>>>>>>
>>>>>>>> What would be the ideal number for this setting? Is it based on the
>>>>>>>> hardware of cluster?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> Gavin
>>>>>>>>
>>>>>>>> On Fri, Jan 8, 2016 at 2:48 PM, Benyi Wang <bewang.t...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>    - I assume your parquet files are compressed. Gzip or Snappy?
>>>>>>>>>    - What spark version did you use? It seems at least 1.4. If
>>>>>>>>>    you use spark-sql and tungsten, you might have better performance. 
>>>>>>>>> but
>>>>>>>>>    spark 1.5.2 gave me a wrong result when the data was about 
>>>>>>>>> 300~400GB, just
>>>>>>>>>    for a simple group-by and aggregate.
>>>>>>>>>    - Did you use kyro serialization?
>>>>>>>>>    - you should have spark.shuffle.compress=true, verify it.
>>>>>>>>>    - How many tasks did you use? spark.default.parallelism=?
>>>>>>>>>    - What about this:
>>>>>>>>>       - Read the data day by day
>>>>>>>>>       - compute a bucket id from timestamp, e.g., the date and
>>>>>>>>>       hour
>>>>>>>>>       - Write into different buckets (you probably need a special
>>>>>>>>>       writer to write data efficiently without shuffling the data).
>>>>>>>>>       - distinct for each bucket. Because each bucket is small,
>>>>>>>>>       spark can get it done faster than having everything in one run.
>>>>>>>>>       - I think using groupBy (userId, timestamp) might be better
>>>>>>>>>       than distinct. I guess distinct() will compare every field.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Jan 8, 2016 at 2:31 PM, Gavin Yue <yue.yuany...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> And the most frequent operation I am gonna do is find the UserID
>>>>>>>>>> who have some events, then retrieve all the events associted with the
>>>>>>>>>> UserID.
>>>>>>>>>>
>>>>>>>>>> In this case, how should I partition to speed up the process?
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>> On Fri, Jan 8, 2016 at 2:29 PM, Gavin Yue <yue.yuany...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> hey Ted,
>>>>>>>>>>>
>>>>>>>>>>> Event table is like this: UserID, EventType, EventKey,
>>>>>>>>>>> TimeStamp, MetaData.  I just parse it from Json and save as 
>>>>>>>>>>> Parquet, did
>>>>>>>>>>> not change the partition.
>>>>>>>>>>>
>>>>>>>>>>> Annoyingly, every day's incoming Event data having duplicates
>>>>>>>>>>> among each other.  One same event could show up in Day1 and Day2 and
>>>>>>>>>>> probably Day3.
>>>>>>>>>>>
>>>>>>>>>>> I only want to keep single Event table and each day it come so
>>>>>>>>>>> many duplicates.
>>>>>>>>>>>
>>>>>>>>>>> Is there a way I could just insert into Parquet and if duplicate
>>>>>>>>>>> found, just ignore?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Gavin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:18 PM, Ted Yu <yuzhih...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Is your Parquet data source partitioned by date ?
>>>>>>>>>>>>
>>>>>>>>>>>> Can you dedup within partitions ?
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:10 PM, Gavin Yue <
>>>>>>>>>>>> yue.yuany...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I tried on Three day's data.  The total input is only 980GB,
>>>>>>>>>>>>> but the shuffle write Data is about 6.2TB, then the job failed 
>>>>>>>>>>>>> during
>>>>>>>>>>>>> shuffle read step, which should be another 6.2TB shuffle read.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I think to Dedup, the shuffling can not be avoided. Is there
>>>>>>>>>>>>> anything I could do to stablize this process?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jan 8, 2016 at 2:04 PM, Gavin Yue <
>>>>>>>>>>>>> yue.yuany...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I got everyday's Event table and want to merge them into a
>>>>>>>>>>>>>> single Event table. But there so many duplicates among each 
>>>>>>>>>>>>>> day's data.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I use Parquet as the data source.  What I am doing now is
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> EventDay1.unionAll(EventDay2).distinct().write.parquet("a new
>>>>>>>>>>>>>> parquet file").
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Each day's Event is stored in their own Parquet file
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> But it failed at the stage2 which keeps losing connection to
>>>>>>>>>>>>>> one executor. I guess this is due to the memory issue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any suggestion how I do this efficiently?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Gavin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Ayan Guha
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to