bq. serializeUncompressed()

Is there a method which enables compression ?

Just wondering if that would reduce the memory footprint.

Cheers

On Wed, Jul 15, 2015 at 8:06 AM, Saeed Shahrivari <
saeed.shahriv...@gmail.com> wrote:

> I use a simple map/reduce step in a Java/Spark program to remove
> duplicated documents from a large (10 TB compressed) sequence file
> containing some html pages. Here is the partial code:
>
> JavaPairRDD<BytesWritable, NullWritable> inputRecords =
>     sc.sequenceFile(args[0], BytesWritable.class, 
> NullWritable.class).coalesce(numMaps);
>
>
> JavaPairRDD<String, AteshProtos.CacheDoc> hashDocs = inputRecords.mapToPair(t 
> ->
>     cacheDocs.add(new Tuple2<>(BaseEncoding.base64()
>         .encode(Hashing.sha1().hashString(doc.getUrl(), 
> Charset.defaultCharset()).asBytes()), doc));
> });
>
>
> JavaPairRDD<BytesWritable, NullWritable> byteArrays =
>     hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds).
>         mapToPair(t -> new Tuple2<>(new 
> BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()),
>                 NullWritable.get()));
>
>
> The logic is simple. The map generates a sha-1 signature from the html and in 
> the reduce phase we keep the html that has the shortest URL. However, after 
> running for 2-3 hours the application crashes due to memory issue. Here is 
> the exception:
>
> 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 0.0 
> (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit 
> exceeded
>
>
> It seems that the map function keeps the hashDocs RDD in the memory and when 
> the memory is filled in an executor, the application crashes. Persisting the 
> map output to disk solves the problem. Adding the following line between map 
> and reduce solve the issue:
>
> hashDocs.persist(StorageLevel.DISK_ONLY());
>
>
> Is this a bug of Spark?
>
> How can I tell Spark not to keep even a bit of RDD in the memory?
>
>
> Thanks
>
>
>

Reply via email to