Hi All,
I am facing a weird situation which is explained below.
Scenario and Problem: I want to add two attributes to JSON object based on the
look up table values and insert the JSON to Mongo DB. I have broadcast variable
which holds look up table. However, i am not being able to access it inside
foreachPartition as you can see in the code. It does not give me any error but
simply does not display anything. Also, because of it i cant insert JSON to
Mongo DB. I cant find any explanation to this behaviour. Any explanation or
work around to make it work is much appreciated.
Note: I am using spark streaming and the file comes as micro batch to HDFS. I
want to be able to use more cores in spark streaming (i have 16 cores
available) so that processing can be done faster.
Thanks.
Regards,
Prajwol
Here is my full code:
object ProcessMicroBatchStreams {
val calculateDistance = udf {
(lat: String, lon: String) =>
GeoHash.getDistance(lat.toDouble, lon.toDouble) }
val DB_NAME = "IRT"
val COLLECTION_NAME = "sensordata"
val records = Array[String]()
def main(args: Array[String]): Unit = {
if (args.length < 0) {
System.err.println("Usage: ProcessMicroBatchStreams <master>
<input_directory>")
System.exit(1)
}
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName(this.getClass.getCanonicalName)
.set("spark.hadoop.validateOutputSpecs", "false")
/*.set("spark.executor.instances", "3")
.set("spark.executor.memory", "18g")
.set("spark.executor.cores", "9")
.set("spark.task.cpus", "1")
.set("spark.driver.memory", "10g")*/
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val sqc = new SQLContext(sc)
val gpsLookUpTable = MapInput.cacheMappingTables(sc,
sqc).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
val broadcastTable = sc.broadcast(gpsLookUpTable)
ssc.textFileStream("hdfs://localhost:9000/inputDirectory/")
.foreachRDD { rdd =>
//broadcastTable.value.show() // I can access broadcast value here
if (!rdd.partitions.isEmpty) {
val partitionedRDD = rdd.repartition(4)
partitionedRDD.foreachPartition {
partition =>
println("Inside Partition")
broadcastTable.value.show() // I cannot access broadcast value here
partition.foreach {
row =>
val items = row.split("\n")
items.foreach { item =>
val mongoColl = MongoClient()(DB_NAME)(COLLECTION_NAME)
val jsonObject = new JSONObject(item)
val latitude = jsonObject.getDouble(Constants.LATITUDE)
val longitude = jsonObject.getDouble(Constants.LONGITUDE)
// The broadcast value is not being shown here
// However, there is no error shown
// I cannot insert the value into Mongo DB
val selectedRow = broadcastTable.value
.filter("geoCode LIKE '" + GeoHash.subString(latitude,
longitude) + "%'")
.withColumn("Distance", calculateDistance(col("Lat"),
col("Lon")))
.orderBy("Distance")
.select(Constants.TRACK_KM, Constants.TRACK_NAME).take(1)
if (selectedRow.length != 0) {
jsonObject.put(Constants.TRACK_KM, selectedRow(0).get(0))
jsonObject.put(Constants.TRACK_NAME, selectedRow(0).get(1))
}
else {
jsonObject.put(Constants.TRACK_KM, "NULL")
jsonObject.put(Constants.TRACK_NAME, "NULL")
}
val record =
JSON.parse(jsonObject.toString()).asInstanceOf[DBObject]
mongoColl.insert(record)
}
}
}
}
}
sys.addShutdownHook {
ssc.stop(true, true)
}
ssc.start()
ssc.awaitTermination()
}
}