*I am new to Spark world and Job Server

My Code :*

package spark.jobserver

import java.nio.ByteBuffer

import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer
import scala.collection.immutable.Map

import org.apache.cassandra.hadoop.ConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper
import org.apache.cassandra.hadoop.cql3.CqlOutputFormat
import org.apache.cassandra.utils.ByteBufferUtil
import org.apache.hadoop.mapreduce.Job

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try

object CassandraCQLTest extends SparkJob{

  def main(args: Array[String]) {   
    val sc = new SparkContext("local[4]", "CassandraCQLTest")
   
sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar");
    val config = ConfigFactory.parseString("")
    val results = runJob(sc, config)
    println("Result is " + "test")
  }
  
  override def validate(sc: SparkContext, config: Config):
SparkJobValidation = {
    Try(config.getString("input.string"))
      .map(x => SparkJobValid)
      .getOrElse(SparkJobInvalid("No input.string config param"))
  }
  
  override def runJob(sc: SparkContext, config: Config): Any = {
    val cHost: String = "localhost"
    val cPort: String = "9160"
    val KeySpace = "retail"
    val InputColumnFamily = "ordercf"
    val OutputColumnFamily = "salecount"

    val job = new Job()
    job.setInputFormatClass(classOf[CqlPagingInputFormat])
    ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost)
    ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort)
    ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace,
InputColumnFamily)
    ConfigHelper.setInputPartitioner(job.getConfiguration(),
"Murmur3Partitioner")
    CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3")

    /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(),
"user_id='bob'") */

    /** An UPDATE writes one or more columns to a record in a Cassandra
column family */
    val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET
sale_count = ? "
    CqlConfigHelper.setOutputCql(job.getConfiguration(), query)

    job.setOutputFormatClass(classOf[CqlOutputFormat])
    ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace,
OutputColumnFamily)
    ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost)
    ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort)
    ConfigHelper.setOutputPartitioner(job.getConfiguration(),
"Murmur3Partitioner")

    val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
      classOf[CqlPagingInputFormat],
      classOf[java.util.Map[String,ByteBuffer]],
      classOf[java.util.Map[String,ByteBuffer]])

    
    val productSaleRDD = casRdd.map {
      case (key, value) => {
        (ByteBufferUtil.string(value.get("prod_id")),
ByteBufferUtil.toInt(value.get("quantity")))
      }
    }
    val aggregatedRDD = productSaleRDD.reduceByKey(_ + _)
    aggregatedRDD.collect().foreach {
      case (productId, saleCount) => println(productId + ":" + saleCount)
    }

    val casoutputCF  = aggregatedRDD.map {
      case (productId, saleCount) => {
        val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId))
        val outKey: java.util.Map[String, ByteBuffer] = outColFamKey
        var outColFamVal = new ListBuffer[ByteBuffer]
        outColFamVal += ByteBufferUtil.bytes(saleCount)
        val outVal: java.util.List[ByteBuffer] = outColFamVal
       (outKey, outVal)
      }
    }

    casoutputCF.saveAsNewAPIHadoopFile(
        KeySpace,
        classOf[java.util.Map[String, ByteBuffer]],
        classOf[java.util.List[ByteBuffer]],
        classOf[CqlOutputFormat],
        job.getConfiguration()
      )
    casRdd.count
  }
}

*When I push the Jar using spark-jobServer and execute it I get this on
spark-jobserver terminal
*
job-server[ERROR] Exception in thread "pool-1-thread-1"
java.lang.NoClassDefFoundError:
org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat
job-server[ERROR]       at
spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46)
job-server[ERROR]       at
spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21)
job-server[ERROR]       at
spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235)
job-server[ERROR]       at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
job-server[ERROR]       at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
job-server[ERROR]       at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
job-server[ERROR]       at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
job-server[ERROR]       at java.lang.Thread.run(Thread.java:745)
job-server[ERROR] Caused by: java.lang.ClassNotFoundException:
org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
job-server[ERROR]       at 
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
job-server[ERROR]       at 
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
job-server[ERROR]       at java.security.AccessController.doPrivileged(Native
Method)
job-server[ERROR]       at
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
job-server[ERROR]       at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
job-server[ERROR]       at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
job-server[ERROR]       ... 8 more

*I have already added the $EXTRA_JAR variable to my
cassandra-spark-connector-assembly.

Regards,
Anand*




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to