Thanks for your reply Sean.

Looks like it's happening in a map:

15/02/18 20:35:52 INFO scheduler.DAGScheduler: Submitting 100 missing tasks
from Stage 1 (MappedRDD[17] at mapToPair at
NativeMethodAccessorImpl.java:-2)

That's my initial 'parse' stage, done before repartitioning. It reduces the
data size significantly so I thought it would be sensible to do before
repartitioning, which involves moving lots of data around. That might be a
stupid idea in hindsight!

So the obvious thing to try would be to try repartitioning before the map
as the first transformation. I would have done that if I could be sure that
it would succeed or fail quickly.

I'm not entirely clear about the lazy execution of transformations in DAG.
It could be that the error is manifesting during the mapToPair, but caused
by the earlier read from text file stage.

Thanks for pointers to those compression formats. I'll give them a go
(although it's not trivial to re-encode 200 GB of data on S3, so if I can
get this working reasonably with gzip I'd like to).

Any advice about whether this error can be worked round with an early
partition?

Cheers

Joe


On 19 February 2015 at 09:51, Sean Owen <so...@cloudera.com> wrote:

> gzip and zip are not splittable compression formats; bzip and lzo are.
> Ideally, use a splittable compression format.
>
> Repartitioning is not a great solution since it means a shuffle, typically.
>
> This is not necessarily related to how big your partitions are. The
> question is, when does this happen? what operation?
>
> On Thu, Feb 19, 2015 at 9:35 AM, Joe Wass <jw...@crossref.org> wrote:
> > On the advice of some recent discussions on this list, I thought I would
> try
> > and consume gz files directly. I'm reading them, doing a preliminary map,
> > then repartitioning, then doing normal spark things.
> >
> > As I understand it, zip files aren't readable in partitions because of
> the
> > format, so I thought that repartitioning would be the next best thing for
> > parallelism. I have about 200 files, some about 1GB compressed and some
> over
> > 2GB uncompressed.
> >
> > I'm hitting the 2GB maximum partition size. It's been discussed on this
> list
> > (topic: "2GB limit for partitions?", tickets SPARK-1476 and SPARK-1391).
> > Stack trace at the end. This happened at 10 hours in (probably when it
> saw
> > its first file). I can't just re-run it quickly!
> >
> > Does anyone have any advice? Might I solve this by re-partitioning as the
> > first step after reading the file(s)? Or is it effectively impossible to
> > read a gz file that expands to over 2GB? Does anyone have any experience
> > with this?
> >
> > Thanks in advance
> >
> > Joe
> >
> > Stack trace:
> >
> > Exception in thread "main" 15/02/18 20:44:25 INFO
> scheduler.TaskSetManager:
> > Lost task 5.3 in stage 1.0 (TID 283) on executor:
> > java.lang.IllegalArgumentException (Size exceeds Integer.MAX_VALUE)
> > [duplicate 6]
> > org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 2 in
> > stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage
> 1.0:
> > java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> >         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:829)
> >         at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:123)
> >         at
> org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:132)
> >         at
> > org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:517)
> >         at
> > org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:432)
> >         at
> org.apache.spark.storage.BlockManager.get(BlockManager.scala:618)
> >         at
> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44)
> >         at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>

Reply via email to