Hi everyone.

I have some problems running multiple streams at the same time.

What i am doing is:

object Test {
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.StreamingContext._
  import org.apache.spark.api.java.function._
  import org.apache.spark.streaming._
  import org.apache.spark.streaming.api._

  def main(args: Array[String]) {
    val sc = new SparkContext(new
SparkConf().setMaster("local[10]").setAppName("test"))
    val task1 = startListening(sc, Seconds(5))
    val task2 = startListening(sc, Seconds(5))
    val task3 = startListening(sc, Seconds(5))
    val task4 = startListening(sc, Seconds(5))

    Thread.sleep(10000)
    println("Killing...")
    task1.stop()
    task2.stop()
    task3.stop()
    task4.stop()
  }

  private def startListening(sc: SparkContext, duration: Duration):
StreamingTask = {
    val ssc = new StreamingContext(sc, duration)
    val lines = ssc.socketTextStream("localhost", 9999)
    lines.foreachRDD { rdd =>
      println(rdd.collect().mkString("===="))
    }
    ssc.start()
    StreamingTask(ssc)
  }
}

case class StreamingTask(ssc: StreamingContext) {
  def stop() {
    ssc.stop(stopSparkContext = false, stopGracefully = false)
  }
}

The idea is that i am sharing the same SparkContext between different
Streaming contexts.
What i am getting is:

Exception in thread "main" akka.actor.InvalidActorNameException: actor name
[JobScheduler] is not unique!
        at
akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
        at akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
        at akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
        at akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
        at akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
        at akka.actor.ActorCell.attachChild(ActorCell.scala:338)
        at akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
        at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:57)
        at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:438)
        at io.ubix.spark.Test$.startListening(Test.scala:38)
        at io.ubix.spark.Test$.main(Test.scala:20)
        at io.ubix.spark.Test.main(Test.scala)

And this is true because we are trying to create 4 JobScheduler's actors
with the same name.
>From other hand - the question is how to deal with this kind of situations?
Am i doing something wrong?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Multiple-streams-at-the-same-time-tp9819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to