Hello Spark community and Holden,

I am trying to follow Holden Karau's SparkSQL and ElasticSearch tutorial
from Spark Summit 2014. I am trying to use elasticsearch-spark 2.1.0.Beta3
and SparkSQL 1.2 together.

https://github.com/holdenk/elasticsearchspark
*(Side Note: This very nice tutorial does NOT BUILD with sbt 0.13 and
sbt-assembly 0.12.0-M1 and Spark 1.2...so many problems I just gave up. )*

In any case, I am trying to work from Holden's tutorial to see
ElasticSearch and SparkSQL interoperate, with the data being a bunch of
JSON documents in ElasticSearch.

The problem is with the following code, at the line with *sqlCtx.esRDD(...,
...):*

// this is my def main(args) inside of my test Object
    val Array(esResource, esNodes) = args.take(2)
    val conf = new SparkConf().setAppName("TestEsSpark")
    conf.set("es.index.auto.create", "true")
    val sc = new SparkContext(conf)
    val sqlCtx = new SQLContext(sc)
    import sqlCtx._
    val query = """{"query":{"match":{"schemaName":"
SuperControllerRequest.json"}}}"""
*    val searched = sqlCtx.esRDD(esResource, query)* // <---- PROBLEM HERE
<---------------------
    println(searched.schema)

I can assemble this with sbt assembly, after much work in getting SBT to
work. However, at RUN TIME, I have the following output, which complains my
sqlCtx.esRDD() has a
NoSuchMethodError org.apache.spark.sql.catalyst.types.StructField
 according to ElasticSearch.

This is a nightmare and I cannot get it to work, does anybody know how to
make this extremely simple test work?

Further below is my SBT build file that I managed to get to work, borrowing
from Holden's build.sbt.

// RUN time exception
Exception in thread "main" *java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.types.StructField*
.<init>(Ljava/lang/String;Lorg/apache/spark/sql/catalyst/types/DataType;Z)V
        at
org.elasticsearch.spark.sql.MappingUtils$.org$elasticsearch$spark$sql$MappingUtils$$convertField(MappingUtils.scala:75)
        at
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
        at
org.elasticsearch.spark.sql.MappingUtils$$anonfun$convertToStruct$1.apply(MappingUtils.scala:54)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at
org.elasticsearch.spark.sql.MappingUtils$.convertToStruct(MappingUtils.scala:54)
        at
org.elasticsearch.spark.sql.MappingUtils$.discoverMapping(MappingUtils.scala:47)
        at
org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:27)
        at
org.elasticsearch.spark.sql.EsSparkSQL$.esRDD(EsSparkSQL.scala:24)
        at
org.elasticsearch.spark.sql.package$SQLContextFunctions.esRDD(package.scala:17)
    *    at testesspark.TestEsSpark$.main(TestEsSpark.scala:41)   *
    // <---- PROBLEM HERE points to above line in code
        at testesspark.TestEsSpark.main(TestEsSpark.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


//build.sbt
import sbtassembly.PathList

name := "testesspark"

version := "0.1.1"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.2.0"
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
    exclude("org.eclipse.jetty.orbit", "javax.transaction")
    exclude("org.eclipse.jetty.orbit", "javax.mail")
    exclude("org.eclipse.jetty.orbit", "javax.activation")
    exclude("commons-beanutils", "commons-beanutils-core")
    exclude("commons-collections", "commons-collections")
    exclude("com.esotericsoftware.minlog", "minlog")
    exclude("org.slf4j", "jcl-over-slf4j" )
    exclude("org.apache.hadoop","hadoop-yarn-api")
    exclude("org.slf4j","slf4j-api"),
  "org.apache.spark" %% "spark-sql" % "1.2.0"
    exclude("org.eclipse.jetty.orbit", "javax.servlet")
    exclude("org.eclipse.jetty.orbit", "javax.transaction")
    exclude("org.eclipse.jetty.orbit", "javax.mail")
    exclude("org.eclipse.jetty.orbit", "javax.activation")
    exclude("commons-beanutils", "commons-beanutils-core")
    exclude("commons-collections", "commons-collections")
    exclude("com.esotericsoftware.minlog", "minlog")
    exclude("org.slf4j", "jcl-over-slf4j" )
    exclude("org.slf4j","slf4j-api"),
  "org.elasticsearch" %% "elasticsearch-spark" % "2.1.0.Beta3"
)

resolvers ++= Seq(
  Resolver.sonatypeRepo("snapshots"),
  Resolver.sonatypeRepo("public"),
  "conjars.org" at "http://conjars.org/repo";,
  "JBoss Repository" at "
http://repository.jboss.org/nexus/content/repositories/releases/";,
  "Spray Repository" at "http://repo.spray.cc/";,
  "Cloudera Repository" at "
https://repository.cloudera.com/artifactory/cloudera-repos/";,
  "Akka Repository" at "http://repo.akka.io/releases/";,
  "Twitter4J Repository" at "http://twitter4j.org/maven2/";,
  "Apache HBase" at "
https://repository.apache.org/content/repositories/releases";,
  "Twitter Maven Repo" at "http://maven.twttr.com/";,
  "scala-tools" at "https://oss.sonatype.org/content/groups/scala-tools";,
  "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/";,
  "Second Typesafe repo" at "
http://repo.typesafe.com/typesafe/maven-releases/";
)

assemblyMergeStrategy in assembly :=
{
  case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
  case m if m.startsWith("META-INF") => MergeStrategy.discard
  case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
  case PathList("org", "apache", xs @ _*) => MergeStrategy.first
  case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
  case "about.html"  => MergeStrategy.rename
  case "reference.conf" => MergeStrategy.concat
  case _ => MergeStrategy.first
}

mainClass in assembly := Some("testesspark.TestEsSpark")

Reply via email to