I removed custom shutdown hook and it still doesn't work. I'm using
KafkaDirectStream.
I sometimes get java.lang.InterruptedException on Ctrl+C sometimes it goes
through fine.
I have this code now:
... some stream processing ...
ssc.start()
ssc.awaitTermination()
ssc.stop(stopSparkContext = false, stopGracefully = true)
It seems there is undocumented config parameter
spark.streaming.stopGracefullyOnShutdown (Boolean).
Based on the source code for StreamingContext.scala where there is:
def start()
match {
case INITIALIZED =>
...
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
...
...
}
}
def stopOnShutdown() = {
val stopGracefully =
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}
I tried to set spark.streaming.stopGracefullyOnShutdown=true in SparkConf.
And removed ssc.stop(stopSparkContext = false, stopGracefully = true) from
my code.
On Ctrl+C is seems to gracefully shutdown without any erorr but the job
can't be restarted with the same error that DirectKafkaInputDStream has not
been initialized. Only when checkpoints are deleted it can been started
again.
Can anybody confirm that graceful shutdown works with
DirectKafkaInputDStream?
Here is the error output for InterruptedException for Ctrl+C:
[2015-09-11 14:47:16,662] INFO Starting task 0.0 in stage 1.0 (TID 1,
10.29.52.111, ANY, 2026 bytes) (org.apache.spark.scheduler.TaskSetManager)
[2015-09-11 14:47:16,662] INFO ensureFreeSpace(1600) called with
curMem=6769, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,662] INFO Block broadcast_2 stored as values in memory
(estimated size 1600.0 B, free 530.3 MB)
(org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,663] INFO ensureFreeSpace(1020) called with
curMem=8369, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,663] INFO Block broadcast_2_piece0 stored as bytes in
memory (estimated size 1020.0 B, free 530.3 MB)
(org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:16,671] INFO Added broadcast_2_piece0 in memory on
10.29.52.111:51578 (size: 1020.0 B, free: 530.3 MB)
(org.apache.spark.storage.BlockManagerInfo)
[2015-09-11 14:47:16,671] INFO Created broadcast 2 from broadcast at
DAGScheduler.scala:861 (org.apache.spark.SparkContext)
[2015-09-11 14:47:16,672] INFO Submitting 2 missing tasks from
ShuffleMapStage 2 (ParallelCollectionRDD[0] at parallelize at
Aggregator.scala:174) (org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 14:47:16,672] INFO Adding task set 2.0 with 2 tasks
(org.apache.spark.scheduler.TaskSchedulerImpl)
[2015-09-11 14:47:16,673] INFO Added broadcast_1_piece0 in memory on
10.29.52.111:50459 (size: 2.4 KB, free: 530.3 MB)
(org.apache.spark.storage.BlockManagerInfo)
^C[2015-09-11 14:47:19,896] INFO Invoking stop(stopGracefully=false) from
shutdown hook (org.apache.spark.streaming.StreamingContext)
[2015-09-11 14:47:19,896] INFO Stopping JobGenerator immediately
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 14:47:19,897] INFO Stopped timer for JobGenerator after time
1441975620000 (org.apache.spark.streaming.util.RecurringTimer)
[2015-09-11 14:47:19,897] INFO CheckpointWriter executor terminated ? true,
waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
[2015-09-11 14:47:19,898] INFO Stopped JobGenerator
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 14:47:21,899] ERROR Aborting job.
(org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation)
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at
org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at
org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:927)
at
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:927)
at
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at
org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at
...AllMessagesAggregator$$anonfun$aggregate$2.apply(AllMessagesAggregator.scala:90)
at
...AllMessagesAggregator$$anonfun$aggregate$2.apply(AllMessagesAggregator.scala:73)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
[2015-09-11 14:47:21,899] INFO Stopped JobScheduler
(org.apache.spark.streaming.scheduler.JobScheduler)
[2015-09-11 14:47:21,901] ERROR Job job_201509111447_0000 aborted.
(org.apache.spark.sql.execution.datasources.DefaultWriterContainer)
[2015-09-11 14:47:21,903] INFO stopped
o.s.j.s.ServletContextHandler{/streaming,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,904] INFO stopped
o.s.j.s.ServletContextHandler{/streaming/batch,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,905] INFO stopped
o.s.j.s.ServletContextHandler{/static/streaming,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,906] INFO StreamingContext stopped successfully
(org.apache.spark.streaming.StreamingContext)
[2015-09-11 14:47:21,906] INFO Invoking stop() from shutdown hook
(org.apache.spark.SparkContext)
[2015-09-11 14:47:21,906] WARN StreamingContext has already been stopped
(org.apache.spark.streaming.StreamingContext)
[2015-09-11 14:47:21,927] INFO stopped
o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/streaming/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/static/sql,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/SQL,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,928] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/api,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/static,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/executors/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/executors,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,929] INFO stopped
o.s.j.s.ServletContextHandler{/environment/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/environment,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/storage/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/storage,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,930] INFO stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/stages/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/stages,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,931] INFO stopped
o.s.j.s.ServletContextHandler{/jobs,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 14:47:21,984] INFO Stopped Spark web UI at
http://10.29.52.111:4040 (org.apache.spark.ui.SparkUI)
[2015-09-11 14:47:21,986] INFO Stopping DAGScheduler
(org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 14:47:21,988] INFO ShuffleMapStage 1 (flatMap at
Aggregator.scala:62) failed in 5.327 s
(org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 14:47:21,988] INFO ShuffleMapStage 2 (parallelize at
Aggregator.scala:174) failed in 5.316 s
(org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 14:47:21,988] INFO Shutting down all executors
(org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
[2015-09-11 14:47:21,989] INFO Asking each executor to shut down
(org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
[2015-09-11 14:47:22,069] INFO MapOutputTrackerMasterEndpoint stopped!
(org.apache.spark.MapOutputTrackerMasterEndpoint)
[2015-09-11 14:47:22,072] INFO MemoryStore cleared
(org.apache.spark.storage.MemoryStore)
[2015-09-11 14:47:22,072] INFO BlockManager stopped
(org.apache.spark.storage.BlockManager)
[2015-09-11 14:47:22,073] INFO BlockManagerMaster stopped
(org.apache.spark.storage.BlockManagerMaster)
[2015-09-11 14:47:22,074] INFO OutputCommitCoordinator stopped!
(org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint)
[2015-09-11 14:47:22,074] INFO Successfully stopped SparkContext
(org.apache.spark.SparkContext)
[2015-09-11 14:47:22,074] INFO Shutdown hook called
(org.apache.spark.util.ShutdownHookManager)
[2015-09-11 14:47:22,075] INFO Deleting directory
/dfs/spark/tmp/spark-485adbf1-8df3-4685-a4f4-3840dedc4762
(org.apache.spark.util.ShutdownHookManager)
Here is the output when tried to restart after Ctrl+C from checkpoint:
[2015-09-11 22:33:05,644] INFO Reading from the logs:
file:/dfs/spark/checkpoints/receivedBlockMetadata/log-1442003410931-1442003470931
file:/dfs/spark/checkpoints/receivedBlockMetadata/log-1442003476320-1442003536320
(WriteAheadLogManager )
[2015-09-11 22:33:05,659] INFO Batches during down time (3 batches):
1442003460000 ms, 1442003520000 ms, 1442003580000 ms
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 22:33:05,660] INFO Batches pending processing (2 batches):
1442003400000 ms, 1442003460000 ms
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 22:33:05,660] INFO Batches to reschedule (4 batches):
1442003400000 ms, 1442003460000 ms, 1442003520000 ms, 1442003580000 ms
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 22:33:05,682] INFO Marking RDD 39 for time 1442003400000 ms for
checkpointing (org.apache.spark.streaming.dstream.StateDStream)
[2015-09-11 22:33:05,691] INFO Marking RDD 45 for time 1442003400000 ms for
checkpointing (org.apache.spark.streaming.dstream.StateDStream)
[2015-09-11 22:33:05,694] ERROR Error starting the context, marking it as
stopped (org.apache.spark.streaming.StreamingContext)
org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@17095b7 has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:82)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[2015-09-11 22:33:05,695] INFO Stopping JobGenerator immediately
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 22:33:05,696] INFO Stopped timer for JobGenerator after time -1
(org.apache.spark.streaming.util.RecurringTimer)
[2015-09-11 22:33:05,722] INFO CheckpointWriter executor terminated ? true,
waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
[2015-09-11 22:33:05,722] INFO Stopped JobGenerator
(org.apache.spark.streaming.scheduler.JobGenerator)
[2015-09-11 22:33:05,723] INFO Stopped JobScheduler
(org.apache.spark.streaming.scheduler.JobScheduler)
Exception in thread "main" org.apache.spark.SparkException:
org.apache.spark.streaming.kafka.DirectKafkaInputDStream@17095b7 has not
been initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
at
org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
at scala.Option.orElse(Option.scala:289)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:82)
at
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[2015-09-11 22:33:05,725] INFO Invoking stop() from shutdown hook
(org.apache.spark.SparkContext)
[2015-09-11 22:33:05,747] INFO stopped
o.s.j.s.ServletContextHandler{/static/sql,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,747] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,747] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/execution,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/SQL/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/SQL,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/metrics/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/api,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/static,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,748] INFO stopped
o.s.j.s.ServletContextHandler{/executors/threadDump,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/executors/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/executors,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/environment/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/environment,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/storage/rdd,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/storage/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/storage,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,749] INFO stopped
o.s.j.s.ServletContextHandler{/stages/pool/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/stages/pool,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/stages/stage,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/stages/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/stages,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/job/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/job,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/jobs/json,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,750] INFO stopped
o.s.j.s.ServletContextHandler{/jobs,null}
(org.spark-project.jetty.server.handler.ContextHandler)
[2015-09-11 22:33:05,802] INFO Stopped Spark web UI at
http://10.29.52.111:4040 (org.apache.spark.ui.SparkUI)
[2015-09-11 22:33:05,804] INFO Stopping DAGScheduler
(org.apache.spark.scheduler.DAGScheduler)
[2015-09-11 22:33:05,805] INFO Shutting down all executors
(org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
[2015-09-11 22:33:05,805] INFO Asking each executor to shut down
(org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
[2015-09-11 22:33:05,889] INFO MapOutputTrackerMasterEndpoint stopped!
(org.apache.spark.MapOutputTrackerMasterEndpoint)
[2015-09-11 22:33:05,894] INFO MemoryStore cleared
(org.apache.spark.storage.MemoryStore)
[2015-09-11 22:33:05,894] INFO BlockManager stopped
(org.apache.spark.storage.BlockManager)
[2015-09-11 22:33:05,897] INFO BlockManagerMaster stopped
(org.apache.spark.storage.BlockManagerMaster)
[2015-09-11 22:33:05,899] INFO OutputCommitCoordinator stopped!
(org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint)
[2015-09-11 22:33:05,899] INFO Successfully stopped SparkContext
(org.apache.spark.SparkContext)
[2015-09-11 22:33:05,899] INFO Shutdown hook called
(org.apache.spark.util.ShutdownHookManager)
[2015-09-11 22:33:05,899] INFO Deleting directory
/dfs/spark/tmp/spark-b466fc2e-9ab8-4783-87c2-485bac5c3cd6
(org.apache.spark.util.ShutdownHookManager)
Thanks,
Petr
On Mon, Sep 14, 2015 at 3:10 PM, Petr Novak <[email protected]> wrote:
> Based on the source code for StreamingContext.scala where there is:
>
> def start()
> match {
> case INITIALIZED =>
> ...
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
> StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
> ...
> ...
> }
> }
>
> def stopOnShutdown() = {
> val stopGracefully = conf.getBoolean("
> spark.streaming.stopGracefullyOnShutdown", false)
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
> }
>
> I tried to set spark.streaming.stopGracefullyOnShutdown=true in SparkConf.
> And removed ssc.stop(stopSparkContext = false, stopGracefully = true)
> from my code.
>
> On Ctrl+C is seems to gracefully shutdown without any erorr but it the job
> can't be restarted with the same error that DirectKafkaInputDStream has not
> been initialized. Only when checkpoints are deleted it can been started
> again.
>
> Can anybody confirm that graceful shutdown works with
> DirectKafkaInputDStream?
>
> Many thanks,
> Petr
>
> On Fri, Sep 11, 2015 at 10:37 PM, Petr Novak <[email protected]> wrote:
>
>> Error seems to happen when there are data in processing. It is not
>> possible to start the job again from checkpoint with the below log. I'm
>> using DirectKafkaStream.
>>
>>
>> Many thanks for any suggestion,
>> Petr
>>
>> [2015-09-11 22:33:05,644] INFO Reading from the logs:
>> file:/dfs/1/spark/warehouse/mco-checkpoints/receivedBlockMetadata/log-1442003410931-1442003470931
>> file:/dfs/1/spark/warehouse/mco-checkpoints/receivedBlockMetadata/log-1442003476320-1442003536320
>> (WriteAheadLogManager )
>> [2015-09-11 22:33:05,659] INFO Batches during down time (3 batches):
>> 1442003460000 ms, 1442003520000 ms, 1442003580000 ms
>> (org.apache.spark.streaming.scheduler.JobGenerator)
>> [2015-09-11 22:33:05,660] INFO Batches pending processing (2 batches):
>> 1442003400000 ms, 1442003460000 ms
>> (org.apache.spark.streaming.scheduler.JobGenerator)
>> [2015-09-11 22:33:05,660] INFO Batches to reschedule (4 batches):
>> 1442003400000 ms, 1442003460000 ms, 1442003520000 ms, 1442003580000 ms
>> (org.apache.spark.streaming.scheduler.JobGenerator)
>> [2015-09-11 22:33:05,682] INFO Marking RDD 39 for time 1442003400000 ms
>> for checkpointing (org.apache.spark.streaming.dstream.StateDStream)
>> [2015-09-11 22:33:05,691] INFO Marking RDD 45 for time 1442003400000 ms
>> for checkpointing (org.apache.spark.streaming.dstream.StateDStream)
>> [2015-09-11 22:33:05,694] ERROR Error starting the context, marking it as
>> stopped (org.apache.spark.streaming.StreamingContext)
>> org.apache.spark.SparkException:
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@17095b7 has not
>> been initialized
>> at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>> at
>> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:289)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
>> at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>> at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:82)
>> at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
>> at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>> at
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.App$class.main(App.scala:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> [2015-09-11 22:33:05,695] INFO Stopping JobGenerator immediately
>> (org.apache.spark.streaming.scheduler.JobGenerator)
>> [2015-09-11 22:33:05,696] INFO Stopped timer for JobGenerator after time
>> -1 (org.apache.spark.streaming.util.RecurringTimer)
>> [2015-09-11 22:33:05,722] INFO CheckpointWriter executor terminated ?
>> true, waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
>> [2015-09-11 22:33:05,722] INFO Stopped JobGenerator
>> (org.apache.spark.streaming.scheduler.JobGenerator)
>> [2015-09-11 22:33:05,723] INFO Stopped JobScheduler
>> (org.apache.spark.streaming.scheduler.JobScheduler)
>> Exception in thread "main" org.apache.spark.SparkException:
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@17095b7 has not
>> been initialized
>> at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>> at
>> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:88)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:289)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>> at
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
>> at
>> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>> at
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
>> at
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:82)
>> at
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:596)
>> at
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>> at
>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>> at scala.collection.immutable.List.foreach(List.scala:381)
>> at
>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>> at scala.App$class.main(App.scala:76)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> [2015-09-11 22:33:05,725] INFO Invoking stop() from shutdown hook
>> (org.apache.spark.SparkContext)
>> [2015-09-11 22:33:05,747] INFO stopped
>> o.s.j.s.ServletContextHandler{/static/sql,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,747] INFO stopped
>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,747] INFO stopped
>> o.s.j.s.ServletContextHandler{/SQL/execution,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/SQL/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/SQL,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/metrics/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/api,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/static,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,748] INFO stopped
>> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/executors/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/executors,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/environment/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/environment,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/storage/rdd,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/storage/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/storage,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,749] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/pool/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/pool,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/stage/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/stage,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/stages,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/jobs/job/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/jobs/job,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/jobs/json,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,750] INFO stopped
>> o.s.j.s.ServletContextHandler{/jobs,null}
>> (org.spark-project.jetty.server.handler.ContextHandler)
>> [2015-09-11 22:33:05,802] INFO Stopped Spark web UI at
>> http://10.29.52.111:4040 (org.apache.spark.ui.SparkUI)
>> [2015-09-11 22:33:05,804] INFO Stopping DAGScheduler
>> (org.apache.spark.scheduler.DAGScheduler)
>> [2015-09-11 22:33:05,805] INFO Shutting down all executors
>> (org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
>> [2015-09-11 22:33:05,805] INFO Asking each executor to shut down
>> (org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
>> [2015-09-11 22:33:05,889] INFO MapOutputTrackerMasterEndpoint stopped!
>> (org.apache.spark.MapOutputTrackerMasterEndpoint)
>> [2015-09-11 22:33:05,894] INFO MemoryStore cleared
>> (org.apache.spark.storage.MemoryStore)
>> [2015-09-11 22:33:05,894] INFO BlockManager stopped
>> (org.apache.spark.storage.BlockManager)
>> [2015-09-11 22:33:05,897] INFO BlockManagerMaster stopped
>> (org.apache.spark.storage.BlockManagerMaster)
>> [2015-09-11 22:33:05,899] INFO OutputCommitCoordinator stopped!
>> (org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint)
>> [2015-09-11 22:33:05,899] INFO Successfully stopped SparkContext
>> (org.apache.spark.SparkContext)
>> [2015-09-11 22:33:05,899] INFO Shutdown hook called
>> (org.apache.spark.util.ShutdownHookManager)
>> [2015-09-11 22:33:05,899] INFO Deleting directory
>> /dfs/1/spark/tmp/spark-b466fc2e-9ab8-4783-87c2-485bac5c3cd6
>> (org.apache.spark.util.ShutdownHookManager)
>>
>> On Fri, Sep 11, 2015 at 3:41 PM, Petr Novak <[email protected]> wrote:
>>
>>> Hi,
>>> I sometimes get java.lang.InterruptedException on Ctrl+C sometimes it
>>> goes through fine.
>>>
>>> I have this code now:
>>>
>>> ... some stream processing ...
>>> ssc.start()
>>> ssc.awaitTermination()
>>> ssc.stop(stopSparkContext = false, stopGracefully = true)
>>>
>>> I want to exit completely so I'm not sure if I should se
>>> stopSparkContext to true.
>>>
>>> It seems there is undocumented config parameter
>>> spark.streaming.stopGracefullyOnShutdown (Boolean). If se to true then
>>> this code (without ssc.stop) is equivalent to the previous one?
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>>
>>>
>>>
>>> Here is the error output:
>>>
>>> [2015-09-11 14:47:16,662] INFO Starting task 0.0 in stage 1.0 (TID 1,
>>> 10.29.52.111, ANY, 2026 bytes) (org.apache.spark.scheduler.TaskSetManager)
>>> [2015-09-11 14:47:16,662] INFO ensureFreeSpace(1600) called with
>>> curMem=6769, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
>>> [2015-09-11 14:47:16,662] INFO Block broadcast_2 stored as values in
>>> memory (estimated size 1600.0 B, free 530.3 MB)
>>> (org.apache.spark.storage.MemoryStore)
>>> [2015-09-11 14:47:16,663] INFO ensureFreeSpace(1020) called with
>>> curMem=8369, maxMem=556038881 (org.apache.spark.storage.MemoryStore)
>>> [2015-09-11 14:47:16,663] INFO Block broadcast_2_piece0 stored as bytes
>>> in memory (estimated size 1020.0 B, free 530.3 MB)
>>> (org.apache.spark.storage.MemoryStore)
>>> [2015-09-11 14:47:16,671] INFO Added broadcast_2_piece0 in memory on
>>> 10.29.52.111:51578 (size: 1020.0 B, free: 530.3 MB)
>>> (org.apache.spark.storage.BlockManagerInfo)
>>> [2015-09-11 14:47:16,671] INFO Created broadcast 2 from broadcast at
>>> DAGScheduler.scala:861 (org.apache.spark.SparkContext)
>>> [2015-09-11 14:47:16,672] INFO Submitting 2 missing tasks from
>>> ShuffleMapStage 2 (ParallelCollectionRDD[0] at parallelize at
>>> Aggregator.scala:174) (org.apache.spark.scheduler.DAGScheduler)
>>> [2015-09-11 14:47:16,672] INFO Adding task set 2.0 with 2 tasks
>>> (org.apache.spark.scheduler.TaskSchedulerImpl)
>>> [2015-09-11 14:47:16,673] INFO Added broadcast_1_piece0 in memory on
>>> 10.29.52.111:50459 (size: 2.4 KB, free: 530.3 MB)
>>> (org.apache.spark.storage.BlockManagerInfo)
>>> ^C[2015-09-11 14:47:19,896] INFO Invoking stop(stopGracefully=false)
>>> from shutdown hook (org.apache.spark.streaming.StreamingContext)
>>> [2015-09-11 14:47:19,896] INFO Stopping JobGenerator immediately
>>> (org.apache.spark.streaming.scheduler.JobGenerator)
>>> [2015-09-11 14:47:19,897] INFO Stopped timer for JobGenerator after time
>>> 1441975620000 (org.apache.spark.streaming.util.RecurringTimer)
>>> [2015-09-11 14:47:19,897] INFO CheckpointWriter executor terminated ?
>>> true, waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
>>> [2015-09-11 14:47:19,898] INFO Stopped JobGenerator
>>> (org.apache.spark.streaming.scheduler.JobGenerator)
>>> [2015-09-11 14:47:21,899] ERROR Aborting job.
>>> (org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation)
>>> java.lang.InterruptedException
>>> at java.lang.Object.wait(Native Method)
>>> at java.lang.Object.wait(Object.java:503)
>>> at
>>> org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
>>> at
>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1813)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1826)
>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
>>> at
>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:927)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:927)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>>> at
>>> org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>>> at
>>> ...AllMessagesAggregator$$anonfun$aggregate$2.apply(AllMessagesAggregator.scala:90)
>>> at
>>> ...AllMessagesAggregator$$anonfun$aggregate$2.apply(AllMessagesAggregator.scala:73)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
>>> at
>>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at
>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>> at scala.util.Try$.apply(Try.scala:192)
>>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:207)
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:207)
>>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>> at
>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:206)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>> [2015-09-11 14:47:21,899] INFO Stopped JobScheduler
>>> (org.apache.spark.streaming.scheduler.JobScheduler)
>>> [2015-09-11 14:47:21,901] ERROR Job job_201509111447_0000 aborted.
>>> (org.apache.spark.sql.execution.datasources.DefaultWriterContainer)
>>> [2015-09-11 14:47:21,903] INFO stopped
>>> o.s.j.s.ServletContextHandler{/streaming,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,904] INFO stopped
>>> o.s.j.s.ServletContextHandler{/streaming/batch,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,905] INFO stopped
>>> o.s.j.s.ServletContextHandler{/static/streaming,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,906] INFO StreamingContext stopped successfully
>>> (org.apache.spark.streaming.StreamingContext)
>>> [2015-09-11 14:47:21,906] INFO Invoking stop() from shutdown hook
>>> (org.apache.spark.SparkContext)
>>> [2015-09-11 14:47:21,906] WARN StreamingContext has already been stopped
>>> (org.apache.spark.streaming.StreamingContext)
>>> [2015-09-11 14:47:21,927] INFO stopped
>>> o.s.j.s.ServletContextHandler{/streaming/batch/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/streaming/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/static/sql,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/SQL/execution,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/SQL/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/SQL,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/metrics/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,928] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/api,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/static,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/executors/threadDump,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/executors/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/executors,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,929] INFO stopped
>>> o.s.j.s.ServletContextHandler{/environment/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/environment,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/storage/rdd,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/storage/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/storage,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,930] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/pool/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/pool,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/stage/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/stage,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/stages,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/jobs/job/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/jobs/job,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/jobs/json,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,931] INFO stopped
>>> o.s.j.s.ServletContextHandler{/jobs,null}
>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>> [2015-09-11 14:47:21,984] INFO Stopped Spark web UI at
>>> http://10.29.52.111:4040 (org.apache.spark.ui.SparkUI)
>>> [2015-09-11 14:47:21,986] INFO Stopping DAGScheduler
>>> (org.apache.spark.scheduler.DAGScheduler)
>>> [2015-09-11 14:47:21,988] INFO ShuffleMapStage 1 (flatMap at
>>> Aggregator.scala:62) failed in 5.327 s
>>> (org.apache.spark.scheduler.DAGScheduler)
>>> [2015-09-11 14:47:21,988] INFO ShuffleMapStage 2 (parallelize at
>>> Aggregator.scala:174) failed in 5.316 s
>>> (org.apache.spark.scheduler.DAGScheduler)
>>> [2015-09-11 14:47:21,988] INFO Shutting down all executors
>>> (org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
>>> [2015-09-11 14:47:21,989] INFO Asking each executor to shut down
>>> (org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend)
>>> [2015-09-11 14:47:22,069] INFO MapOutputTrackerMasterEndpoint stopped!
>>> (org.apache.spark.MapOutputTrackerMasterEndpoint)
>>> [2015-09-11 14:47:22,072] INFO MemoryStore cleared
>>> (org.apache.spark.storage.MemoryStore)
>>> [2015-09-11 14:47:22,072] INFO BlockManager stopped
>>> (org.apache.spark.storage.BlockManager)
>>> [2015-09-11 14:47:22,073] INFO BlockManagerMaster stopped
>>> (org.apache.spark.storage.BlockManagerMaster)
>>> [2015-09-11 14:47:22,074] INFO OutputCommitCoordinator stopped!
>>> (org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint)
>>> [2015-09-11 14:47:22,074] INFO Successfully stopped SparkContext
>>> (org.apache.spark.SparkContext)
>>> [2015-09-11 14:47:22,074] INFO Shutdown hook called
>>> (org.apache.spark.util.ShutdownHookManager)
>>> [2015-09-11 14:47:22,075] INFO Deleting directory
>>> /dfs/1/spark/tmp/spark-485adbf1-8df3-4685-a4f4-3840dedc4762
>>> (org.apache.spark.util.ShutdownHookManager)
>>>
>>> Thanks,
>>> Petr
>>>
>>> On Fri, Sep 11, 2015 at 1:08 PM, Petr Novak <[email protected]>
>>> wrote:
>>>
>>>> I'm sorry for being laze and not to debug jstack but removing custom
>>>> shutdown hook seems to solve the problem.
>>>>
>>>> So many thanks to you as you enabled my upgrade to Spark 1.5
>>>>
>>>> Petr
>>>>
>>>> On Thu, Sep 10, 2015 at 9:54 PM, Tathagata Das <[email protected]>
>>>> wrote:
>>>>
>>>>> Spark 1.4.0 introduced built-in shutdown hooks that would shutdown
>>>>> StreamingContext and SparkContext (similar to yours). If you are also
>>>>> introducing your shutdown hook, I wonder whats the behavior going to be.
>>>>>
>>>>> Try doing a jstack to see where the system is stuck. Alternatively,
>>>>> remove your shutdown hook and see what happens.
>>>>>
>>>>> On Thu, Sep 10, 2015 at 3:11 AM, Petr Novak <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> my Spark streaming v1.3.0 code uses
>>>>>>
>>>>>> sys.ShutdownHookThread {
>>>>>> ssc.stop(stopSparkContext = true, stopGracefully = true)
>>>>>> }
>>>>>>
>>>>>> to use Ctrl+C in command line to stop it. It returned back to command
>>>>>> line after it finished batch but it doesn't with v1.4.0-v.1.5.0. Was the
>>>>>> behaviour or required code changed?
>>>>>>
>>>>>> The last messages are:
>>>>>>
>>>>>> [2015-09-08 13:02:43,300] INFO Waited for jobs to be processed and
>>>>>> checkpoints to be written
>>>>>> (org.apache.spark.streaming.scheduler.JobGenerator)
>>>>>> [2015-09-08 13:02:43,300] INFO CheckpointWriter executor terminated ?
>>>>>> true, waited for 0 ms. (org.apache.spark.streaming.CheckpointWriter)
>>>>>> [2015-09-08 13:02:43,301] INFO Stopped JobGenerator
>>>>>> (org.apache.spark.streaming.scheduler.JobGenerator)
>>>>>> [2015-09-08 13:02:43,302] INFO Stopped JobScheduler
>>>>>> (org.apache.spark.streaming.scheduler.JobScheduler)
>>>>>> [2015-09-08 13:02:43,303] INFO stopped
>>>>>> o.s.j.s.ServletContextHandler{/streaming,null}
>>>>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>>>>> [2015-09-08 13:02:43,305] INFO stopped
>>>>>> o.s.j.s.ServletContextHandler{/streaming/batch,null}
>>>>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>>>>> [2015-09-08 13:02:43,307] INFO stopped
>>>>>> o.s.j.s.ServletContextHandler{/static/streaming,null}
>>>>>> (org.spark-project.jetty.server.handler.ContextHandler)
>>>>>>
>>>>>> Thank you for any explanation,
>>>>>> Petr
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>