Hi,

it seems like I am unable to shut down my StreamingContext properly, both
in local[n] and yarn-cluster mode. In addition, (only) in yarn-cluster
mode, subsequent use of a new StreamingContext will raise
an InvalidActorNameException.

I use

  logger.info("stoppingStreamingContext")
  staticStreamingContext.stop(stopSparkContext=false,
    stopGracefully=true)
  logger.debug("done")

and have in my output logs

  19:16:47.708 [ForkJoinPool-2-worker-11] INFO  stopping StreamingContext
    [... output from other threads ...]
  19:17:07.729 [ForkJoinPool-2-worker-11] WARN  scheduler.JobGenerator -
Timed out while stopping the job generator (timeout = 20000)
  19:17:07.739 [ForkJoinPool-2-worker-11] DEBUG done

The processing itself is complete, i.e., the batch currently processed at
the time of stop() is finished and no further batches are processed.
However, something keeps the streaming context from stopping properly. In
local[n] mode, this is not actually a problem (other than I have to wait 20
seconds for shutdown), but in yarn-cluster mode, I get an error

  akka.actor.InvalidActorNameException: actor name [JobGenerator] is not
unique!

when I start a (newly created) StreamingContext, and I was wondering what
* is the issue with stop()
* is the difference between local[n] and yarn-cluster mode.

Some possible reasons:
* On my executors, I use a networking library that depends on netty and
doesn't properly shut down the event loop. (That has not been a problem in
the past, though.)
* I have a non-empty state (from using updateStateByKey()) that is
checkpointed to /tmp/spark (in local mode) and hdfs:///tmp/spark (in
yarn-cluster) mode, could that be an issue? (In fact, I have not seen this
error in any non-stateful stream applications before.)

Any help much appreciated!

Thanks
Tobias

Reply via email to