bq. serializeUncompressed() Is there a method which enables compression ?
Just wondering if that would reduce the memory footprint. Cheers On Wed, Jul 15, 2015 at 8:06 AM, Saeed Shahrivari < saeed.shahriv...@gmail.com> wrote: > I use a simple map/reduce step in a Java/Spark program to remove > duplicated documents from a large (10 TB compressed) sequence file > containing some html pages. Here is the partial code: > > JavaPairRDD<BytesWritable, NullWritable> inputRecords = > sc.sequenceFile(args[0], BytesWritable.class, > NullWritable.class).coalesce(numMaps); > > > JavaPairRDD<String, AteshProtos.CacheDoc> hashDocs = inputRecords.mapToPair(t > -> > cacheDocs.add(new Tuple2<>(BaseEncoding.base64() > .encode(Hashing.sha1().hashString(doc.getUrl(), > Charset.defaultCharset()).asBytes()), doc)); > }); > > > JavaPairRDD<BytesWritable, NullWritable> byteArrays = > hashDocs.reduceByKey((a, b) -> a.getUrl() < b.getUrl() ? a : b, numReds). > mapToPair(t -> new Tuple2<>(new > BytesWritable(PentV3.buildFromMessage(t._2).serializeUncompressed()), > NullWritable.get())); > > > The logic is simple. The map generates a sha-1 signature from the html and in > the reduce phase we keep the html that has the shortest URL. However, after > running for 2-3 hours the application crashes due to memory issue. Here is > the exception: > > 15/07/15 18:24:05 WARN scheduler.TaskSetManager: Lost task 267.0 in stage 0.0 > (TID 267, psh-11.nse.ir): java.lang.OutOfMemoryError: GC overhead limit > exceeded > > > It seems that the map function keeps the hashDocs RDD in the memory and when > the memory is filled in an executor, the application crashes. Persisting the > map output to disk solves the problem. Adding the following line between map > and reduce solve the issue: > > hashDocs.persist(StorageLevel.DISK_ONLY()); > > > Is this a bug of Spark? > > How can I tell Spark not to keep even a bit of RDD in the memory? > > > Thanks > > >