*@Sasi*

You should be able to create a job something like this:

package io.radtech.spark.jobserver

import java.util.UUID
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.rdd.RDD
import org.joda.time.DateTime
import com.datastax.spark.connector.types.TypeConverter
import com.datastax.spark.connector.types.TypeConversionException
import com.typesafe.config.Config

case class AnalyticReport(
  deviceId: UUID,
  reportType: String,
  timestamp: DateTime,
  data: Map[String, String])

class ReadWriteCassandraJob {
}

trait AlycsReportSparkJob extends spark.jobserver.SparkJob with
    spark.jobserver.NamedRddSupport {

  val rddName = "report"

  // Validation is not really needed in this example
  def validate(sc: SparkContext, config: Config):
spark.jobserver.SparkJobValidation = spark.jobserver.SparkJobValid
}

object ReadWriteCassandraJob extends AlycsReportSparkJob {

  val cassandraHost = "127.0.0.1"
  val keyspace = "test"
  val table = "alycs_reports_by_device"

  /*
   * Enable Cassandra-specific functions on the `SparkContext` and `RDD`:
   */
  import com.datastax.spark.connector._

  /*
   * Before creating the `SparkContext`, set the
`spark.cassandra.connection.host`
   * property to the address of one of the Cassandra nodes.
   */
  val conf = new SparkConf(true).set("spark.cassandra.connection.host",
cassandraHost)

  /*
   * Set the port to connect to.  If using embedded instance set to 9142
else
   * default to 9042.
   */
  conf.set("spark.cassandra.connection.native.port", "9042")


  override def runJob(sc: SparkContext, config: Config) = {
    // Read table test.alycs_reports_by_device and print its contents:

    val rdd = sc.cassandraTable(keyspace, table).select(
      "device_id", "report_type", "time", "data")

    rdd.collect().foreach(println)

    val rddrows = rdd.map(r =>
      AnalyticReport(UUID.fromString(r.getUUID("device_id").toString()),
        r.getString("report_type"),
        new org.joda.time.DateTime(r.getDate("time")),
        r.getMap[String, String]("data")))

    rddrows.collect.foreach(println)

  }

}

Then create a custom spark context file,
src/main/resources/spark.context-settings.config, for the job; note the
versions of the jars are incorrect below, don't have the latest ones off
the top of my head.  If you are using the uber / fat jar from
spark-cassandra-connector then simple place that here instead, i believe
the name is:  spark-cassandra-connector-assembly-1.1.0.jar.

spark.context-settings {
    spark.cores.max = 4
    spark.cassandra.connection.host 127.0.0.1
    dependent-jar-uris = [
      "local://sparkshell-lib/spark-cassandra-connector_2.10-1.0.0-rc4.jar",
      "local://sparkshell-lib/cassandra-clientutil-2.0.9.jar",
      "local://sparkshell-lib/cassandra-thrift-2.0.9.jar",
      "local://sparkshell-lib/cassandra-driver-core-2.0.4.jar",
      "local://sparkshell-lib/guava-15.0.jar",
      "local://sparkshell-lib/libthrift-0.9.1.jar",
      "local://sparkshell-lib/joda-convert-1.2.jar",
      "local://sparkshell-lib/joda-time-2.3.jar"
    ]
}

Now post the context to the job server:

radtech:spark-jobserver-example$ curl -d
src/main/resources/spark.context-settings.config -X POST
'localhost:8090/contexts/cassJob-context'



Then execute your job:

curl --data-binary
@target/scala-2.10/spark-jobserver-example_2.10-0.1.0.jar
localhost:8090/jars/cassjob
curl -X POST 
'localhost:8090/jobs?appName=cassjob&classPath=io.radtech.spark.jobserver.ReadWriteCassandraJob&context=cassJob-context'

Worse case you should be able to set these in your spark-defatul.conf to a
location that is common to all your executors:

spark.executor.extraClassPath=.....

HTH.

-Todd

On Tue, Jan 6, 2015 at 10:00 AM, bchazalet <bchaza...@companywatch.net>
wrote:

> It does not look like you're supposed to fiddle with the SparkConf and even
> SparkContext in a 'job' (again, I don't know much about jobserver), as
> you're given a SparkContext as parameter in the build method.
>
> I guess jobserver initialises the SparkConf and SparkContext itself when it
> first starts, meanwhile you're actually creating a new one within your job,
> which the github example you mentionned doesn't do, it just uses the
> context
> given as parameter:
>
> def build(sc: SparkContext): RDD[(Reputation, User)] = {
>     sc.textFile(inputPath).
>       map(User.fromRow).
>       collect {
>         case Some(user) => user.reputation -> user
>       }.
>       sortByKey(ascending = false)
>   }
>
> I am not sure either how you upload your job's jar to the server (the curl
> command you posted does not seem to do so).
>
> Maybe you could try first to make it work on its own as a regular spark
> app,
> without using jobserver.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Set-EXTRA-JAR-environment-variable-for-spark-jobserver-tp20989p20998.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to