Hi everyone.
I have some problems running multiple streams at the same time.
What i am doing is:
object Test {
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming._
import org.apache.spark.streaming.api._
def main(args: Array[String]) {
val sc = new SparkContext(new
SparkConf().setMaster("local[10]").setAppName("test"))
val task1 = startListening(sc, Seconds(5))
val task2 = startListening(sc, Seconds(5))
val task3 = startListening(sc, Seconds(5))
val task4 = startListening(sc, Seconds(5))
Thread.sleep(10000)
println("Killing...")
task1.stop()
task2.stop()
task3.stop()
task4.stop()
}
private def startListening(sc: SparkContext, duration: Duration):
StreamingTask = {
val ssc = new StreamingContext(sc, duration)
val lines = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD { rdd =>
println(rdd.collect().mkString("===="))
}
ssc.start()
StreamingTask(ssc)
}
}
case class StreamingTask(ssc: StreamingContext) {
def stop() {
ssc.stop(stopSparkContext = false, stopGracefully = false)
}
}
The idea is that i am sharing the same SparkContext between different
Streaming contexts.
What i am getting is:
Exception in thread "main" akka.actor.InvalidActorNameException: actor name
[JobScheduler] is not unique!
at
akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
at akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
at akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:57)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:438)
at io.ubix.spark.Test$.startListening(Test.scala:38)
at io.ubix.spark.Test$.main(Test.scala:20)
at io.ubix.spark.Test.main(Test.scala)
And this is true because we are trying to create 4 JobScheduler's actors
with the same name.
>From other hand - the question is how to deal with this kind of situations?
Am i doing something wrong?
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.