The problem is the code you use to test:
sc.parallelize(List(1, 2, 3)).map(throw new
SparkException("test")).collect();
is like the following example:
def foo: Int => Nothing = {
throw new SparkException("test")
}
sc.parallelize(List(1, 2, 3)).map(foo).collect();
So actually the Spark jobs do not be submitted since it fails in `foo` that
is used to create the map function.
Change it to
sc.parallelize(List(1, 2, 3)).map(i => throw new
SparkException("test")).collect();
And you will see the correct messages from your listener.
Best Regards,
Shixiong(Ryan) Zhu
2015-04-19 1:06 GMT+08:00 Praveen Balaji <[email protected]>:
> Thanks for the response, Archit. I get callbacks when I do not throw an
> exception from map.
> My use case, however, is to get callbacks for exceptions in
> transformations on executors. Do you think I'm going down the right route?
>
> Cheers
> -p
>
> On Sat, Apr 18, 2015 at 1:49 AM, Archit Thakur <[email protected]>
> wrote:
>
>> Hi Praveen,
>> Can you try once removing throw exception in map. Do you still not get
>> it.?
>> On Apr 18, 2015 8:14 AM, "Praveen Balaji" <
>> [email protected]> wrote:
>>
>>> Thanks for the response, Imran. I probably chose the wrong methods for
>>> this email. I implemented all methods of SparkListener and the only
>>> callback I get is onExecutorMetricsUpdate.
>>>
>>> Here's the complete code:
>>>
>>> ======
>>>
>>> import org.apache.spark.scheduler._
>>>
>>> sc.addSparkListener(new SparkListener() {
>>> override def onStageCompleted(e: SparkListenerStageCompleted) =
>>> println(">>>> onStageCompleted");
>>> override def onStageSubmitted(e: SparkListenerStageSubmitted) =
>>> println(">>>> onStageSubmitted");
>>> override def onTaskStart(e: SparkListenerTaskStart) =
>>> println(">>>> onTaskStart");
>>> override def onTaskGettingResult(e:
>>> SparkListenerTaskGettingResult) = println(">>>> onTaskGettingResult");
>>> override def onTaskEnd(e: SparkListenerTaskEnd) = println(">>>>
>>> onTaskEnd");
>>> override def onJobStart(e: SparkListenerJobStart) = println(">>>
>>> onJobStart");
>>> override def onJobEnd(e: SparkListenerJobEnd) = println(">>>>
>>> onJobEnd");
>>> override def onEnvironmentUpdate(e:
>>> SparkListenerEnvironmentUpdate) = println(">>>> onEnvironmentUpdate");
>>> override def onBlockManagerAdded(e:
>>> SparkListenerBlockManagerAdded) = println(">>>> onBlockManagerAdded");
>>> override def onBlockManagerRemoved(e:
>>> SparkListenerBlockManagerRemoved) = println(">>>> onBlockManagerRemoved");
>>> override def onUnpersistRDD(e: SparkListenerUnpersistRDD) =
>>> println(">>>> onUnpersistRDD");
>>> override def onApplicationStart(e: SparkListenerApplicationStart)
>>> = println(">>>> onApplicationStart");
>>> override def onApplicationEnd(e: SparkListenerApplicationEnd) =
>>> println(">>>> onApplicationEnd");
>>> override def onExecutorMetricsUpdate(e:
>>> SparkListenerExecutorMetricsUpdate) = println(">>>>
>>> onExecutorMetricsUpdate");
>>> });
>>>
>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>> SparkException("test")).collect();
>>>
>>> =====
>>>
>>> On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid <[email protected]>
>>> wrote:
>>>
>>>> when you start the spark-shell, its already too late to get the
>>>> ApplicationStart event. Try listening for StageCompleted or JobEnd
>>>> instead.
>>>>
>>>> On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji <
>>>> [email protected]> wrote:
>>>>
>>>>> I'm trying to create a simple SparkListener to get notified of error
>>>>> on executors. I do not get any call backs on my SparkListener. Here some
>>>>> simple code I'm executing in spark-shell. But I still don't get any
>>>>> callbacks on my listener. Am I doing something wrong?
>>>>>
>>>>> Thanks for any clue you can send my way.
>>>>>
>>>>> Cheers
>>>>> Praveen
>>>>>
>>>>> ======
>>>>> import org.apache.spark.scheduler.SparkListener
>>>>> import org.apache.spark.scheduler.SparkListenerApplicationStart
>>>>> import org.apache.spark.scheduler.SparkListenerApplicationEnd
>>>>> import org.apache.spark.SparkException
>>>>>
>>>>> sc.addSparkListener(new SparkListener() {
>>>>> override def onApplicationStart(applicationStart:
>>>>> SparkListenerApplicationStart) {
>>>>> println(">>>> onApplicationStart: " +
>>>>> applicationStart.appName);
>>>>> }
>>>>>
>>>>> override def onApplicationEnd(applicationEnd:
>>>>> SparkListenerApplicationEnd) {
>>>>> println(">>>> onApplicationEnd: " + applicationEnd.time);
>>>>> }
>>>>> });
>>>>>
>>>>> sc.parallelize(List(1, 2, 3)).map(throw new
>>>>> SparkException("test")).collect();
>>>>> =======
>>>>>
>>>>> output:
>>>>>
>>>>> scala> org.apache.spark.SparkException: hshsh
>>>>> at $iwC$$iwC$$iwC$$iwC.<init>(<console>:29)
>>>>> at $iwC$$iwC$$iwC.<init>(<console>:34)
>>>>> at $iwC$$iwC.<init>(<console>:36)
>>>>> at $iwC.<init>(<console>:38)
>>>>>
>>>>>
>>>>
>>>
>