Hi, I’m trying to submit a streaming application on my standalone Spark cluster, this is my code:
import akka.actor.{Props, ActorSystem, Actor} import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.Uri import akka.stream.ActorMaterializer import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.ActorHelper import org.apache.spark.{SparkEnv, SparkContext, SparkConf} object Boot extends App { val driverPort = 7777 val conf = new SparkConf(). setAppName("TestSpark"). set("spark.driver.port", driverPort.toString). set("spark.driver.host", "localhost"). setMaster("spark://<master-uri>:7077"). set("spark.akka.logLifecycleEvents", "true") val sc = new SparkContext(conf) val ssc = new StreamingContext(sc, Seconds(1)) val meetup = ssc.actorStream[String](Props[CustomReceiver], "attore") ChunkReceiver.start(driverPort, "localhost", "attore") meetup.print ssc.start } object ChunkReceiver { def start(driverPort:Int, driverHost:String, actorName:String) { Thread.sleep(5000) implicit val system = ActorSystem("client") import system.dispatcher implicit val materializer = ActorMaterializer() val url = s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName" val actor = SparkEnv.get.actorSystem.actorSelection(url) val source = Uri("http://stream.meetup.com/2/rsvps") Http(). singleRequest(HttpRequest(uri = source)). flatMap { response => response.entity.dataBytes.runForeach { chunk => actor ! chunk.utf8String } } } } class CustomReceiver extends Actor with ActorHelper { log.info("Starting receiver ...") override def receive = { case s:String => store(s) } } And these are the messages generate by Spark: 15/12/21 06:43:24 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] akka.event.Logging$Error$NoCause$ 15/12/21 06:43:26 ERROR EndpointWriter: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] 15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] akka.event.Logging$Error$NoCause$ 15/12/21 06:43:26 ERROR EndpointWriter: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] 15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] akka.event.Logging$Error$NoCause$ 15/12/21 06:43:26 ERROR EndpointWriter: AssociationError [akka.tcp://sparkDriver@localhost:7777] <- [akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [ akka.remote.ShutDownAssociation: Shut down address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128 Caused by: akka.remote.transport.Transport$InvalidAssociationException: The remote system terminated the association because it is shutting down. ] the application doesn’t terminate with errors, but the CustomReceiver doesn’t receive any data. Thank you very much for the help, Luca Guerra -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-Spark-Standalone-tp25750.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org