Yup thanks Ted. My getPartition() method had a bug where a signed int was
being moduloed with the number of partitions. Fixed that.

Thanks,
Ashish

On Thu, Sep 10, 2015 at 10:44 AM, Ted Yu <yuzhih...@gmail.com> wrote:

> Here is snippet of ExternalSorter.scala where ArrayIndexOutOfBoundsException
> was thrown:
>
>     while (iterator.hasNext) {
>       val partitionId = iterator.nextPartition()
>       iterator.writeNext(partitionWriters(partitionId))
>     }
> Meaning, partitionId was negative.
> Execute the following and examine the value of i:
>
>     int i = -78 % 40;
>
> You will see how your getPartition() method should be refined to prevent
> this exception.
>
> On Thu, Sep 10, 2015 at 8:52 AM, Ashish Shenoy <ashe...@instartlogic.com>
> wrote:
>
>> I am using spark-1.4.1
>>
>> Here's the skeleton code:
>>
>> JavaPairRDD<NewKey, ExportObject> rddPair =
>>   rdd.repartitionAndSortWithinPartitions(
>>   new CustomPartitioner(), new ExportObjectComparator())
>>     .persist(StorageLevel.MEMORY_AND_DISK_SER());
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class CustomPartitioner extends Partitioner {
>>   int numPartitions;
>>   @Override
>>   public int numPartitions() {
>>     numPartitions = 40;
>>     return numPartitions;
>>   }
>>
>>   @Override
>>   public int getPartition(Object o) {
>>     NewKey newKey = (NewKey) o;
>>     return (int) newKey.getGsMinusURL() % numPartitions;
>>   }
>> }
>>
>> ...
>>
>> @SuppressWarnings("serial")
>> private static class ExportObjectComparator
>>   implements Serializable, Comparator<NewKey> {
>>   @Override
>>   public int compare(NewKey o1, NewKey o2) {
>>     if (o1.hits == o2.hits) {
>>       return 0;
>>     } else if (o1.hits > o2.hits) {
>>       return -1;
>>     } else {
>>       return 1;
>>     }
>>   }
>>
>> }
>>
>> ...
>>
>>
>>
>> Thanks,
>> Ashish
>>
>> On Wed, Sep 9, 2015 at 5:13 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>
>>> Which release of Spark are you using ?
>>>
>>> Can you show skeleton of your partitioner and comparator ?
>>>
>>> Thanks
>>>
>>>
>>>
>>> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy <ashe...@instartlogic.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am trying to sort a RDD pair
>>> using repartitionAndSortWithinPartitions() for my key [which is a custom
>>> class, not a java primitive] using a custom partitioner on that key and a
>>> custom comparator. However, it fails consistently:
>>>
>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 18 in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in
>>> stage 1.0 (TID 202, 172.16.18.25):
>>> java.lang.ArrayIndexOutOfBoundsException: -78
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
>>>         at
>>> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
>>>         at
>>> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
>>>         at
>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>>>         at org.apache.spark.scheduler.Task.run(Task.scala:70)
>>>         at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>         at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>         at java.lang.Thread.run(Thread.java:745)
>>>
>>> Driver stacktrace:
>>>         at org.apache.spark.scheduler.DAGScheduler.org
>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
>>>         at
>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>         at
>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
>>>         at scala.Option.foreach(Option.scala:236)
>>>         at
>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
>>>         at
>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
>>>         at
>>> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>
>>> I also persist the RDD using the "memory and disk" storage level. The
>>> stack trace above comes from spark's code and not my application code. Can
>>> you pls point out what I am doing wrong ?
>>>
>>> Thanks,
>>> Ashish
>>>
>>>
>>
>

Reply via email to