*I am new to Spark world and Job Server My Code :*
package spark.jobserver import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.ListBuffer import scala.collection.immutable.Map import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat import org.apache.cassandra.hadoop.cql3.CqlConfigHelper import org.apache.cassandra.hadoop.cql3.CqlOutputFormat import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job import com.typesafe.config.{Config, ConfigFactory} import org.apache.spark._ import org.apache.spark.SparkContext._ import scala.util.Try object CassandraCQLTest extends SparkJob{ def main(args: Array[String]) { val sc = new SparkContext("local[4]", "CassandraCQLTest") sc.addJar("/extra_data/spark-cassandra-connector/spark-cassandra-connector-java/target/scala-2.10/spark-cassandra-connector-java-assembly-1.3.0-SNAPSHOT.jar"); val config = ConfigFactory.parseString("") val results = runJob(sc, config) println("Result is " + "test") } override def validate(sc: SparkContext, config: Config): SparkJobValidation = { Try(config.getString("input.string")) .map(x => SparkJobValid) .getOrElse(SparkJobInvalid("No input.string config param")) } override def runJob(sc: SparkContext, config: Config): Any = { val cHost: String = "localhost" val cPort: String = "9160" val KeySpace = "retail" val InputColumnFamily = "ordercf" val OutputColumnFamily = "salecount" val job = new Job() job.setInputFormatClass(classOf[CqlPagingInputFormat]) ConfigHelper.setInputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setInputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setInputColumnFamily(job.getConfiguration(), KeySpace, InputColumnFamily) ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner") CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3") /** CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "user_id='bob'") */ /** An UPDATE writes one or more columns to a record in a Cassandra column family */ val query = "UPDATE " + KeySpace + "." + OutputColumnFamily + " SET sale_count = ? " CqlConfigHelper.setOutputCql(job.getConfiguration(), query) job.setOutputFormatClass(classOf[CqlOutputFormat]) ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KeySpace, OutputColumnFamily) ConfigHelper.setOutputInitialAddress(job.getConfiguration(), cHost) ConfigHelper.setOutputRpcPort(job.getConfiguration(), cPort) ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner") val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[CqlPagingInputFormat], classOf[java.util.Map[String,ByteBuffer]], classOf[java.util.Map[String,ByteBuffer]]) val productSaleRDD = casRdd.map { case (key, value) => { (ByteBufferUtil.string(value.get("prod_id")), ByteBufferUtil.toInt(value.get("quantity"))) } } val aggregatedRDD = productSaleRDD.reduceByKey(_ + _) aggregatedRDD.collect().foreach { case (productId, saleCount) => println(productId + ":" + saleCount) } val casoutputCF = aggregatedRDD.map { case (productId, saleCount) => { val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) val outKey: java.util.Map[String, ByteBuffer] = outColFamKey var outColFamVal = new ListBuffer[ByteBuffer] outColFamVal += ByteBufferUtil.bytes(saleCount) val outVal: java.util.List[ByteBuffer] = outColFamVal (outKey, outVal) } } casoutputCF.saveAsNewAPIHadoopFile( KeySpace, classOf[java.util.Map[String, ByteBuffer]], classOf[java.util.List[ByteBuffer]], classOf[CqlOutputFormat], job.getConfiguration() ) casRdd.count } } *When I push the Jar using spark-jobServer and execute it I get this on spark-jobserver terminal * job-server[ERROR] Exception in thread "pool-1-thread-1" java.lang.NoClassDefFoundError: org/apache/cassandra/hadoop/cql3/CqlPagingInputFormat job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:46) job-server[ERROR] at spark.jobserver.CassandraCQLTest$.runJob(CassandraCQLTest.scala:21) job-server[ERROR] at spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:235) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) job-server[ERROR] at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) job-server[ERROR] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) job-server[ERROR] at java.lang.Thread.run(Thread.java:745) job-server[ERROR] Caused by: java.lang.ClassNotFoundException: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:366) job-server[ERROR] at java.net.URLClassLoader$1.run(URLClassLoader.java:355) job-server[ERROR] at java.security.AccessController.doPrivileged(Native Method) job-server[ERROR] at java.net.URLClassLoader.findClass(URLClassLoader.java:354) job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:425) job-server[ERROR] at java.lang.ClassLoader.loadClass(ClassLoader.java:358) job-server[ERROR] ... 8 more *I have already added the $EXTRA_JAR variable to my cassandra-spark-connector-assembly. Regards, Anand* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-Connection-Issue-with-Spark-jobserver-tp22587.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org