Hi,
I have a spark kafka streaming application that works when I run with a
local spark context, but not with a remote one.
My code consists of:
1. A spring-boot application that creates the context
2. A shaded jar file containing all of my spark code
On my pc (windows), I have a spark (1.5.2) master and worker running
(spark-1.5.2-bin-hadoop2.6).
The entry point for my application is the start() method.
The code is:
@throws(classOf[Exception])
def start {
val ssc: StreamingContext = createStreamingContext
val messagesRDD = createKafkaDStream(ssc, "myTopic", 2)
def datasRDD = messagesRDD.map((line : String) =>
MapFunctions.lineToSparkEventData(line))
def count = datasRDD.count()
datasRDD.print(1)
ssc.start
ssc.awaitTermination
}
private def createStreamingContext: StreamingContext = {
System.setProperty(HADOOP_HOME_DIR, configContainer.getHadoopHomeDir)
System.setProperty("spark.streaming.concurrentJobs",
String.valueOf(configContainer.getStreamingConcurrentJobs))
def sparkConf = createSparkConf()
val ssc = new StreamingContext(sparkConf,
Durations.seconds(configContainer.getStreamingContextDurationSeconds))
ssc.sparkContext.setJobGroup("main_streaming_job", "streaming context
start")
ssc.sparkContext.setLocalProperty("spark.scheduler.pool",
"real_time_pool")
ssc
}
private def createSparkConf() : SparkConf = {
def masterString = "spark://<<my_pc>>:7077"
def conf = new
SparkConf().setMaster(masterString).setAppName("devAppRem") // This is not
working
// def conf = new
SparkConf().setMaster("local[4]").setAppName("devAppLocal") // This IS
working
conf.set("spark.scheduler.allocation.file",
"D:\\valid_path_to\\fairscheduler.xml");
val pathToShadedApplicationJar: String =
configContainer.getApplicationJarPaths.get(0)
val jars: Array[String] = Array[String](pathToShadedApplicationJar)
conf.setJars(jars)
conf.set("spark.scheduler.mode", "FAIR")
}
private def createKafkaDStream(ssc: StreamingContext, topics: String,
numThreads: Int): DStream[String] = {
val zkQuorum: String = configContainer.getZkQuorum
val groupId: String = configContainer.getGroupId
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
def lines = KafkaUtils.createStream(ssc, zkQuorum, groupId, topicMap,
StorageLevel.MEMORY_AND_DISK_SER).map(_._2)
lines
}
}
The Error that I get is:
2015-11-18 12:58:33.755 WARN 6132 --- [result-getter-2]
o.apache.spark.scheduler.TaskSetManager : Lost task 0.0 in stage 2.0 (TID
70, 169.254.95.56): java.io.IOException: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70)
at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.streaming.kafka.KafkaReceiver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
... 19 more
I’m building using maven to create a shaded jar file.
Properties in the parent pom are:
<properties>
<springframework.version>3.2.13.RELEASE</springframework.version>
<springframework.integration.version>2.2.6.RELEASE</springframework.integration.version>
<ibm.mq.version>7.0.1.3</ibm.mq.version>
<hibernate.version>3.5.6-Final</hibernate.version>
<hibernate.jpa.version>1.0.0-CR-1</hibernate.jpa.version>
<validation-api.version>1.0.0.GA</validation-api.version>
<hibernate-validator.version>4.3.0.FINAL</hibernate-validator.version>
<jackson.version>1.8.5</jackson.version>
<powermock.version>1.4.12</powermock.version>
<scala.binary.version>2.10</scala.binary.version>
<spark.version>1.5.2</spark.version>
<scala.version>2.10.5</scala.version>
</properties>
The pom file for the shaded jar is:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>my-spark-client</artifactId>
...
<build>
<finalName>my-spark-client</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>org/datanucleus/**</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"
/>
</transformers>
<artifactSet>
<excludes>
<exclude>classworlds:classworlds</exclude>
<exclude>junit:junit</exclude>
<exclude>jmock:*</exclude>
</excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.0</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>compile-tests-scala</id>
<phase>compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<scope>test</scope>
</dependency>
...
...
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>${scala.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
<version>0.8.2.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>javax.servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>jcl-over-slf4j</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
</dependencies>
....
</project>
org.apache.spark.streaming.kafka.KafkaReceiver is inside the
spark-streaming-kafka jar file, so I’m not sure why I get the
ClassNotFoundException.
Please help.
Thanks,
Tim
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/kafka-streaminf-1-5-2-ClassNotFoundException-org-apache-spark-streaming-kafka-KafkaReceiver-tp25408.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]