I have the following code where I'm using RDD 'union' and 'subtractByKey' to
create a new baseline RDD. All of my RDDs are a key pair with the 'key' a
String and the 'value' a String (xml document).
// ******************************************************// Merge the daily
deletes/updates/adds with the baseline//
****************************************************** // Concat the Updates,
Deletes into one PairRDDJavaPairRDD<String,String> updateDeletePairRDD =
updatePairRDD.union(deletePairRDD); // Remove the update/delete keys from the
baselineJavaPairRDD<String,String> workSubtractBaselinePairRDD =
baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the
AddsJavaPairRDD<String,String> workAddBaselinePairRDD =
workSubtractBaselinePairRDD.union(addPairRDD);
// Add in the UpdatesJavaPairRDD<String,String> newBaselinePairRDD =
workAddBaselinePairRDD.union(updatePairRDD);
When I go to 'count' the newBaselinePairRDD
// Output count for new baseline log.info("Number of new baseline records: " +
newBaselinePairRDD.count());
I'm getting the following exception (the above log.info is SparkSync.java:785).
What I find odd is the reference to spark sql. So, I'm curious as to whether
under the covers the RDD union and/or subtractByKey are implemented as spark
sql. I wouldn't think so but thought I would ask. I'm also suspicious to the
reference to the '<' and whether that is because of my xml document in the
value portion of the key pair. Any insights would be appreciated. If there
are thoughts for how to better approach my problem (even debugging), I would be
interested in that as well. The updatePairRDD, deletePairRDD, baselinePairRDD,
addPairRDD, and updateDeletePairRDD are all 'hashPartitioned'.
It's also a bit difficult to trace things because my application is a 'java'
application and the stack references a lot of scala and very few references to
my application other than one (SparkSync.java:785). My application is using
Spark SQL for some other tasks so perhaps an RDD (part) is being re-calculated
and is resulting in this error. But, based on other logging statements
throughout my application, I don't believe this is the case.
Thanks.
Darin.
14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at
SparkSync.java:78514/11/10 22:35:27 WARN scheduler.TaskSetManager: Lost task
0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal):
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code
60)): expected a valid value (number, String, array, object, 'true', 'false' or
'null') at [Source: java.io.StringReader@e8f759e; line: 1, column: 2]
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557)
com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475)
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415)
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679)
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024)
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971)
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275)
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62)
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50)
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)