Yes there is.
But the RDD is more than 10 TB and compression does not help.

On Wed, Jul 15, 2015 at 8:36 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> bq. serializeUncompressed()
>
> Is there a method which enables compression ?
>
> Just wondering if that would reduce the memory footprint.
>
> Cheers
>
> On Wed, Jul 15, 2015 at 8:06 AM, Saeed Shahrivari <
> saeed.shahriv...@gmail.com> wrote:
>
>> I use a simple map/reduce step in a Java/Spark program to remove
>> duplicated documents from a large (10 TB compressed) sequence file
>> containing some html pages. Here is the partial code:
>>
>> JavaPairRDD<BytesWritable, NullWritable> inputRecords =
>>     sc.sequenceFile(args[0], BytesWritable.class, 
>> NullWritable.class).coalesce(numMaps);
>>
>>
>> JavaPairRDD<String, AteshProtos.CacheDoc> hashDocs = 
>> inputRecords.mapToPair(t ->
>>     cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
>>         .encode(Hashing.sha1().hashString(doc.getUrl(), 
>> Charset.defaultCharset()).asBytes()), doc));
>> });
>>
>>
>> JavaPairRDD<BytesWritable, NullWritable> byteArrays =
>>     hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
>>         mapToPair(t -> new Tuple2<>(new 
>> BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
>>                 NullWritable.get()));
>>
>>
>> The logic is simple. The map generates a sha-1 signature from the html and 
>> in the reduce phase we keep the html that has the shortest URL. However, 
>> after running for 2-3 hours the application crashes due to memory issue. 
>> Here is the exception:
>>
>> 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 
>> 0.0 (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit 
>> exceeded
>>
>>
>> It seems that the map function keeps the hashDocs RDD in the memory and when 
>> the memory is filled in an executor, the application crashes. Persisting the 
>> map output to disk solves the problem. Adding the following line between map 
>> and reduce solve the issue:
>>
>> hashDocs.persist(StorageLevel.DISK_ONLY());
>>
>>
>> Is this a bug of Spark?
>>
>> How can I tell Spark not to keep even a bit of RDD in the memory?
>>
>>
>> Thanks
>>
>>
>>
>

Reply via email to