Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException was thrown:
while (iterator.hasNext) { val partitionId = iterator.nextPartition() iterator.writeNext(partitionWriters(partitionId)) } Meaning, partitionId was negative. Execute the following and examine the value of i: int i = -78 % 40; You will see how your getPartition() method should be refined to prevent this exception. On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy <ashe...@instartlogic.com> wrote: > I am using spark-1.4.1 > > Here's the skeleton code: > > JavaPairRDD<NewKey, ExportObject> rddPair = > rdd.repartitionAndSortWithinPartitions( > new CustomPartitioner(), new ExportObjectComparator()) > .persist(StorageLevel.MEMORY_AND_DISK_SER()); > > ... > > @SuppressWarnings("serial") > private static class CustomPartitioner extends Partitioner { > int numPartitions; > @Override > public int numPartitions() { > numPartitions = 40; > return numPartitions; > } > > @Override > public int getPartition(Object o) { > NewKey newKey = (NewKey) o; > return (int) newKey.getGsMinusURL() % numPartitions; > } > } > > ... > > @SuppressWarnings("serial") > private static class ExportObjectComparator > implements Serializable, Comparator<NewKey> { > @Override > public int compare(NewKey o1, NewKey o2) { > if (o1.hits == o2.hits) { > return 0; > } else if (o1.hits > o2.hits) { > return -1; > } else { > return 1; > } > } > > } > > ... > > > > Thanks, > Ashish > > On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Which release of Spark are you using ? >> >> Can you show skeleton of your partitioner and comparator ? >> >> Thanks >> >> >> >> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy <ashe...@instartlogic.com> >> wrote: >> >> Hi, >> >> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() >> for my key [which is a custom class, not a java primitive] using a custom >> partitioner on that key and a custom comparator. However, it fails >> consistently: >> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in >> stage 1.0 (TID 202, 172.16.18.25): >> java.lang.ArrayIndexOutOfBoundsException: -78 >> at >> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) >> at >> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) >> at >> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >> at >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >> at org.apache.spark.scheduler.Task.run(Task.scala:70) >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> I also persist the RDD using the "memory and disk" storage level. The >> stack trace above comes from spark's code and not my application code. Can >> you pls point out what I am doing wrong ? >> >> Thanks, >> Ashish >> >> >