Hi,
I am creating a cassandra java rdd and transforming it using the where
clause.
It works fine when I run it outside the mapValues, but when I put the code
in mapValues I get an error while creating the transformation.
Below is my sample code:
CassandraJavaRDD<ReferenceData> cassandraRefTable = javaFunctions(sc
).cassandraTable("reference_data",
"dept_reference_data", ReferenceData.class);
JavaPairRDD<String, Employee> joinedRdd = rdd.mapValues(new
Function<IPLocation, IPLocation>() {
public Employee call(Employee employee) throws Exception {
ReferenceData data = null;
if(employee.getDepartment() != null) {
data = referenceTable.where("postal_plus=?", location
.getPostalPlus()).first();
System.out.println(data.toCSV());
}
if(data != null) {
//call setters on employee
}
return employee;
}
}
I get this error:
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.<init>(RDD.scala:125)
at com.datastax.spark.connector.rdd.CassandraRDD.<init>(
CassandraRDD.scala:47)
at com.datastax.spark.connector.rdd.CassandraRDD.copy(CassandraRDD.scala:70)
at com.datastax.spark.connector.rdd.CassandraRDD.where(CassandraRDD.scala:77
)
at com.datastax.spark.connector.rdd.CassandraJavaRDD.where(
CassandraJavaRDD.java:54)
Thanks for help!!
Regards
Ankur