Hi,
I’m trying to submit a streaming application on my standalone Spark cluster,
this is my code:

import akka.actor.{Props, ActorSystem, Actor}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.Uri
import akka.stream.ActorMaterializer
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.ActorHelper
import org.apache.spark.{SparkEnv, SparkContext, SparkConf}

object Boot extends App {
  val driverPort = 7777
  val conf = new SparkConf().
    setAppName("TestSpark").
    set("spark.driver.port", driverPort.toString).
    set("spark.driver.host", "localhost").
    setMaster("spark://<master-uri>:7077").
    set("spark.akka.logLifecycleEvents", "true")

  val sc = new SparkContext(conf)
  val ssc = new StreamingContext(sc, Seconds(1))
  val meetup = ssc.actorStream[String](Props[CustomReceiver], "attore")
  ChunkReceiver.start(driverPort, "localhost", "attore")
  meetup.print
  ssc.start
}

object ChunkReceiver {
  def start(driverPort:Int, driverHost:String, actorName:String) {
    Thread.sleep(5000)
    implicit val system = ActorSystem("client")
    import system.dispatcher

    implicit val materializer = ActorMaterializer()
    val url =
s"akka.tcp://sparkDriver@$driverHost:$driverPort/user/Supervisor0/$actorName"
    val actor = SparkEnv.get.actorSystem.actorSelection(url)
    val source = Uri("http://stream.meetup.com/2/rsvps";)
    Http().
      singleRequest(HttpRequest(uri = source)).
      flatMap { response =>
        response.entity.dataBytes.runForeach { chunk =>
          actor ! chunk.utf8String
        }
      }
  }
}

class CustomReceiver extends Actor with ActorHelper {
  log.info("Starting receiver ...")
  override def receive = {
    case s:String => store(s)
  }
}

And these are the messages generate by Spark:

15/12/21 06:43:24 WARN MetricsSystem: Using default name DAGScheduler for
source because spark.app.id is not set.
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51397
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51397]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51397] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51397
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51463
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:51463]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:51463] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:51463
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
15/12/21 06:43:26 ERROR ErrorMonitor: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:59128
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]
akka.event.Logging$Error$NoCause$
15/12/21 06:43:26 ERROR EndpointWriter: AssociationError
[akka.tcp://sparkDriver@localhost:7777] <-
[akka.tcp://driverPropsFetcher@<ip-cluster>:59128]: Error [Shut down
address: akka.tcp://driverPropsFetcher@<ip-cluster>:59128] [
akka.remote.ShutDownAssociation: Shut down address:
akka.tcp://driverPropsFetcher@<ip-cluster>:59128
Caused by: akka.remote.transport.Transport$InvalidAssociationException: The
remote system terminated the association because it is shutting down.
]

the application doesn’t terminate with errors, but the CustomReceiver
doesn’t receive any data.

Thank you very much for the help,
Luca Guerra




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Problem-with-Spark-Standalone-tp25750.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to