Hi everyone,
I have a job that read segment data from druid then convert to csv.
When I run it in local mode it works fine.
/home/airflow/spark-2.0.2-bin-hadoop2.7/bin/spark-submit --driver-memory 1g
--master "local[4]" --files /home/airflow/spark-jobs/forecast_jobs/prod.conf
--conf spark.executor.extraJavaOptions=-Dconfig.fuction.conf --conf
'spark.driver.extraJavaOptions=-Dconfig.file=/home/airflow/spark-jobs/forecast_jobs/prod.conf'
--class com.ants.druid.spark.GenerateForecastData --driver-class-path
/home/airflow/spark-jobs/forecast_jobs/classmate-0.5.4.jar:/home/airflow/spark-jobs/forecast_jobs/jboss-logging-3.3.0.Final.jar:/home/airflow/spark-jobs/forecast_jobs/hibernate-validator-5.1.3.Final.jar'
--jars
/home/airflow/spark-jobs/forecast_jobs/generate_forecast_data-assembly-1.0-deps.jar
/home/airflow/spark-jobs/forecast_jobs/generate_forecast_data.jar 2016-12-01-02
but When I switch to submit to Mesos I have this error
16/12/13 15:08:45 INFO Guice: An exception was caught and reported. Message:
javax.validation.ValidationException: Unable to create a Configuration, because
no Bean Validation provider could be found. Add a provider like Hibernate
Validator (RI) to your classpath.
javax.validation.ValidationException: Unable to create a Configuration, because
no Bean Validation provider could be found. Add a provider like Hibernate
Validator (RI) to your classpath.
at
javax.validation.Validation$GenericBootstrapImpl.configure(Validation.java:271)
at
javax.validation.Validation.buildDefaultValidatorFactory(Validation.java:110)
at io.druid.guice.ConfigModule.configure(ConfigModule.java:39)
at
com.google.inject.spi.Elements$RecordingBinder.install(Elements.java:223)
at com.google.inject.spi.Elements.getElements(Elements.java:101)
at
com.google.inject.internal.InjectorShell$Builder.build(InjectorShell.java:133)
at
com.google.inject.internal.InternalInjectorCreator.build(InternalInjectorCreator.java:103)
at com.google.inject.Guice.createInjector(Guice.java:95)
at com.google.inject.Guice.createInjector(Guice.java:72)
at
io.druid.guice.GuiceInjectors.makeStartupInjector(GuiceInjectors.java:59)
at
io.druid.indexer.HadoopDruidIndexerConfig.<clinit>(HadoopDruidIndexerConfig.java:99)
at
io.druid.indexer.hadoop.DatasourceInputSplit.readFields(DatasourceInputSplit.java:87)
at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:45)
at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply(SerializableWritable.scala:41)
at
org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply(SerializableWritable.scala:41)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276)
at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:41)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:253)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I tried to set spark-default.conf like below
spark.driver.memory 5g
spark.executor.extraClassPath
/home/airflow/spark-jobs/forecast_jobs/classmate-0.5.4.jar:/home/airflow/spark-jobs/forecast_jobs/hibernate-validator-5.1.3.Final.jar:/home/airflow/spark-jobs/forecast_jobs/jboss-logging-3.3.0.Final.jar
spark.driver.extraClassPath
/home/airflow/spark-jobs/forecast_jobs/classmate-0.5.4.jar:/home/airflow/spark-jobs/forecast_jobs/hibernate-validator-5.1.3.Final.jar:/home/airflow/spark-jobs/forecast_jobs/jboss-logging-3.3.0.Final.jar
But It’s not throws the same error.
My build.sbt
name := "generate_forecast_data"
version := "1.0"
scalaVersion := "2.11.7"
val druid_version = "0.9.1.1"
val json4sVersion = "3.5.0"
val sparkVersion = "2.0.2"
val hadoopVersion = "2.7.0"
val guava_version = "16.0.1"
val sparkDep = ("org.apache.spark" %% "spark-core" % sparkVersion
exclude("org.roaringbitmap", "RoaringBitmap")
exclude("log4j", "log4j")
exclude("org.slf4j", "slf4j-log4j12")
exclude("com.google.guava", "guava")
exclude("org.apache.hadoop", "hadoop-client")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-common")
exclude("com.sun.jersey", "jersey-server")
exclude("com.sun.jersey", "jersey-core")
exclude("com.sun.jersey", "jersey-core")
exclude("com.sun.jersey.contribs", "jersey-guice")
exclude("org.eclipse.jetty", "jetty-server")
exclude("org.eclipse.jetty", "jetty-plus")
exclude("org.eclipse.jetty", "jetty-util")
exclude("org.eclipse.jetty", "jetty-http")
exclude("org.eclipse.jetty", "jetty-servlet")
exclude("com.esotericsoftware.minlog", "minlog")
/*
exclude("com.fasterxml.jackson.core", "jackson-core")
exclude("com.fasterxml.jackson.core", "jackson-annotations")
exclude("com.fasterxml.jackson.dataformat", "jackson-dataformat-smile")
exclude("com.fasterxml.jackson.datatype", "jackson-datatype-joda")
exclude("com.fasterxml.jackson.core", "jackson-databind")
*/
exclude("io.netty", "netty")
exclude("org.apache.mesos", "mesos")
) % "provided"
libraryDependencies += sparkDep
libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion %
"provided"
val hadoopDep = ("org.apache.hadoop" % "hadoop-client" % hadoopVersion
exclude("asm", "asm")
exclude("org.ow2.asm", "asm")
exclude("org.jboss.netty", "netty")
exclude("commons-logging", "commons-logging")
exclude("com.google.guava", "guava")
exclude("org.mortbay.jetty", "servlet-api-2.5")
exclude("javax.servlet", "servlet-api")
exclude("junit", "junit")
exclude("org.slf4j", "slf4j-log4j12")
exclude("log4j", "log4j")
exclude("commons-beanutils", "commons-beanutils")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("com.sun.jersey", "jersey-server")
exclude("com.sun.jersey", "jersey-core")
exclude("com.sun.jersey", "jersey-core")
exclude("com.sun.jersey.contribs", "jersey-guice")
exclude("org.eclipse.jetty", "jetty-server")
exclude("org.eclipse.jetty", "jetty-plus")
exclude("org.eclipse.jetty", "jetty-util")
exclude("org.eclipse.jetty", "jetty-http")
exclude("org.eclipse.jetty", "jetty-servlet")
exclude("commons-beanutils", "commons-beanutils-core")
exclude("com.fasterxml.jackson.core", "jackson-core")
exclude("com.fasterxml.jackson.core", "jackson-annotations")
exclude("com.fasterxml.jackson.dataformat", "jackson-dataformat-smile")
exclude("com.fasterxml.jackson.datatype", "jackson-datatype-joda")
exclude("com.fasterxml.jackson.core", "jackson-databind")
exclude("io.netty", "netty")
) % "provided"
// For Path
libraryDependencies += hadoopDep
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.4" % "test"
libraryDependencies += "io.druid" % "druid-processing" % druid_version
libraryDependencies += "io.druid" % "druid-server" % druid_version
libraryDependencies += "io.druid" % "druid-indexing-service" % druid_version
libraryDependencies += "io.druid" % "druid-indexing-hadoop" % druid_version
libraryDependencies +=
"org.joda" % "joda-convert" % "1.8.1" % "provided" // Prevents intellij
silliness and sbt warnings
libraryDependencies += "com.google.guava" % "guava" % guava_version // Prevents
serde problems for guice exceptions
libraryDependencies += "com.sun.jersey" % "jersey-servlet" % "1.17.1"
libraryDependencies += "com.typesafe" % "config" % "1.3.1"
libraryDependencies += "org.json4s" %% "json4s-native" % json4sVersion
val kafkaDependencies = Seq("org.apache.kafka" % "kafka_2.11" % "0.9.0.1"
exclude("org.slf4j", "slf4j-log4j12")
exclude("log4j", "log4j")
)
libraryDependencies ++= kafkaDependencies
assemblyMergeStrategy in assembly := {
case PathList("javax", "servlet", xs@_*) => MergeStrategy.first
case PathList(ps@_*) if ps.last endsWith ".html" => MergeStrategy.first
case PathList("org", "apache", "commons", "logging", xs@_*) =>
MergeStrategy.first
case PathList("javax", "annotation", xs@_*) => MergeStrategy.last //favor
jsr305
case PathList("mime.types") => MergeStrategy.filterDistinctLines
case PathList("com", "google", "common", "base", xs@_*) => MergeStrategy.last
// spark-network-common pulls these in
case PathList("org", "apache", "spark", "unused", xs@_*) =>
MergeStrategy.first
case PathList("META-INF", xs@_*) =>
xs map {
_.toLowerCase
} match {
case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" ::
Nil) =>
MergeStrategy.discard
case ps@(x :: xs) if ps.last.endsWith(".sf") || ps.last.endsWith(".dsa")
=>
MergeStrategy.discard
case "services" :: xs =>
MergeStrategy.filterDistinctLines
case "jersey-module-version" :: xs => MergeStrategy.first
case "sisu" :: xs => MergeStrategy.discard
case "maven" :: xs => MergeStrategy.discard
case "plexus" :: xs => MergeStrategy.discard
case _ => MergeStrategy.discard
}
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
assemblyJarName in assembly := "generate_forecast_data.jar"
assemblyOption in assembly := (assemblyOption in
assembly).value.copy(includeScala = false, includeDependency = false)
Even I tried this
/home/airflow/spark-2.0.2-bin-hadoop2.7/bin/spark-submit \
--conf spark.executor.uri=http://master:8000/spark-2.0.2-bin-hadoop2.7.tgz \
--driver-memory 1g \
--executor-memory 5g \
--master mesos://master:5050 \
--total-executor-cores 4 \
--files /home/airflow/spark-jobs/forecast_jobs/prod.conf \
--conf spark.executor.extraJavaOptions=-Dconfig.fuction.conf \
--conf
'spark.driver.extraJavaOptions=-Dconfig.file=/home/airflow/spark-jobs/forecast_jobs/prod.conf'
\
--class com.ants.druid.spark.GenerateForecastData \
--conf
'spark.executor.extraClassPath=/home/airflow/spark-jobs/forecast_jobs/hibernate-validator-5.1.3.Final.jar'
\
--driver-class-path
/home/airflow/spark-jobs/forecast_jobs/classmate-0.5.4.jar:/home/airflow/spark-jobs/forecast_jobs/hibernate-validator-5.1.3.Final.jar:/home/airflow/spark-jobs/forecast_jobs/jboss-logging-3.3.0.Final.jar
\
--jars
/build/etl/imply-data1-1.3.0/dist/druid/lib/druid-indexing-hadoop-0.9.1.1.jar,/build/etl/imply-data1-1.3.0/dist/druid/lib/jersey-servlet-1.19.jar,/build/etl/imply-data1-1.3.0/dist/druid/lib/druid-server-0.9.1.1.jar,/build/etl/imply-data1-1.3.0/dist/druid/lib/druid-processing-0.9.1.1.jar,/home/airflow/spark-jobs/forecast_jobs/generate_forecast_data-assembly-1.0-deps.jar
\
/home/airflow/spark-jobs/forecast_jobs/generate_forecast_data.jar 2016-12-01-02
It still didn’t work.
Regards,
Chanh