Yup thanks Ted. My getPartition() method had a bug where a signed int was being moduloed with the number of partitions. Fixed that.
Thanks, Ashish On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu <yuzhih...@gmail.com> wrote: > Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException > was thrown: > > while (iterator.hasNext) { > val partitionId = iterator.nextPartition() > iterator.writeNext(partitionWriters(partitionId)) > } > Meaning, partitionId was negative. > Execute the following and examine the value of i: > > int i = -78 % 40; > > You will see how your getPartition() method should be refined to prevent > this exception. > > On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy <ashe...@instartlogic.com> > wrote: > >> I am using spark-1.4.1 >> >> Here's the skeleton code: >> >> JavaPairRDD<NewKey, ExportObject> rddPair = >> rdd.repartitionAndSortWithinPartitions( >> new CustomPartitioner(), new ExportObjectComparator()) >> .persist(StorageLevel.MEMORY_AND_DISK_SER()); >> >> ... >> >> @SuppressWarnings("serial") >> private static class CustomPartitioner extends Partitioner { >> int numPartitions; >> @Override >> public int numPartitions() { >> numPartitions = 40; >> return numPartitions; >> } >> >> @Override >> public int getPartition(Object o) { >> NewKey newKey = (NewKey) o; >> return (int) newKey.getGsMinusURL() % numPartitions; >> } >> } >> >> ... >> >> @SuppressWarnings("serial") >> private static class ExportObjectComparator >> implements Serializable, Comparator<NewKey> { >> @Override >> public int compare(NewKey o1, NewKey o2) { >> if (o1.hits == o2.hits) { >> return 0; >> } else if (o1.hits > o2.hits) { >> return -1; >> } else { >> return 1; >> } >> } >> >> } >> >> ... >> >> >> >> Thanks, >> Ashish >> >> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu <yuzhih...@gmail.com> wrote: >> >>> Which release of Spark are you using ? >>> >>> Can you show skeleton of your partitioner and comparator ? >>> >>> Thanks >>> >>> >>> >>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy <ashe...@instartlogic.com> >>> wrote: >>> >>> Hi, >>> >>> I am trying to sort a RDD pair >>> using repartitionAndSortWithinPartitions() for my key [which is a custom >>> class, not a java primitive] using a custom partitioner on that key and a >>> custom comparator. However, it fails consistently: >>> >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in >>> stage 1.0 (TID 202, 172.16.18.25): >>> java.lang.ArrayIndexOutOfBoundsException: -78 >>> at >>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375) >>> at >>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208) >>> at >>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) >>> at >>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>> at org.apache.spark.scheduler.Task.run(Task.scala:70) >>> at >>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Driver stacktrace: >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) >>> at >>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>> >>> I also persist the RDD using the "memory and disk" storage level. The >>> stack trace above comes from spark's code and not my application code. Can >>> you pls point out what I am doing wrong ? >>> >>> Thanks, >>> Ashish >>> >>> >> >