Forgot to mention that I've tested that SerIntWritable and PipelineDocumentWritable are serializable by serializing / deserializing to/from a byte array in memory.
On Wed, Oct 1, 2014 at 1:43 PM, Timothy Potter <[email protected]> wrote: > I'm running into the following deserialization issue when trying to > run a very simple Java-based application using a local Master (see > stack trace below). > > My code basically queries Solr using a custom Hadoop InputFormat. I've > hacked my code to make sure the objects involved > (PipelineDocumentWritable and SerIntWritable) are serializable. I only > did this to rule out any weirdness around handling Hadoop Writable > objects. I've also tried the Kyro stuff but it seems this is a problem > with deserializing the task. In other words, I've tried doing: > > sparkConf.set("spark.serializer", KryoSerializer.class.getName()); > sparkConf.set("spark.kryo.registrator", > LWKryoRegistrator.class.getName()); > > And same problem either with or without overriding to use Kryo. In > fact, my LWKryoRegistrator impl never gets invoked so the exception is > happening lower down in the Spark stack. > > Here's the code I'm trying to run (basically query Solr through a > custom InputFormat and then do word count on the text in the tweet_s > field that comes back in the results): > >>>>> > JobConf jobConf = new JobConf(); > jobConf.set(LWMapRedInputFormat.SOLR_ZKHOST, > cli.getOptionValue("zkHost", "localhost:9983")); > jobConf.set(LWMapRedInputFormat.SOLR_COLLECTION, > cli.getOptionValue("collection", "collection1")); > jobConf.set(LWMapRedInputFormat.SOLR_QUERY, > cli.getOptionValue("query", "*:*")); > jobConf.set(LWMapRedInputFormat.SOLR_USE_CURSOR, > cli.getOptionValue("useCursorMark", "false")); > > JavaSparkContext jsc = new JavaSparkContext(conf); > JavaPairRDD<SerIntWritable, PipelineDocumentWritable> solrRDD = > jsc.hadoopRDD(jobConf, > LWSerMapRedInputFormat.class, SerIntWritable.class, > PipelineDocumentWritable.class); > > JavaRDD<String> words = solrRDD.flatMap(new > FlatMapFunction<Tuple2<SerIntWritable, PipelineDocumentWritable>, > String>() { > @Override > public Iterable<String> call(Tuple2<SerIntWritable, > PipelineDocumentWritable> arg) { > String str = > arg._2.getPipelineDocument().getFirstField("tweet_s").toString(); > str = str.toLowerCase().replaceAll("[.,!?\n]", " "); > return Arrays.asList(str.split(" ")); > } > }); > > JavaPairRDD<String, Integer> ones = words.mapToPair(new > PairFunction<String, String, Integer>() { > public Tuple2<String, Integer> call(String s) { > return new Tuple2<String, Integer>(s, 1); > } > }); > JavaPairRDD<String, Integer> counts = ones.reduceByKey(new > Function2<Integer, Integer, Integer>() { > public Integer call(Integer i1, Integer i2) { > return i1 + i2; > } > }); > > counts.foreach(new VoidFunction<Tuple2<String, Integer>>() { > public void call(Tuple2<String, Integer> pair) throws Exception { > System.out.println("\n\n >> "+pair._1+": "+pair._2+" \n"); > } > }); > > <<<< > > This is on my local machine running Spark 1.1.0 pre-built for Hadoop > 2.4. My custom InputFormat is also built against Hadoop 2.4. Here's > how I run this application: > > [~/tools/spark-1.1.0-bin-hadoop2.4/bin]$ ./spark-submit --master local > --class com.lucidworks.spark.SparkApp > spark-proto-1.0-SNAPSHOT-with-deps.jar query-solr > -zkHost=localhost:2181/local410 -collection=foo -query="*:*" > -useCursorMark -v > > I've turned on DEBUG logging and there's not much useful information > beyond the stack trace. > > Here's the exception I'm getting: >>>> > > 2014-10-01 13:21:06,295 [main] INFO DAGScheduler - Failed to run > foreach at SolrQueryProcessor.java:83 > > Exception in thread "main" org.apache.spark.SparkException: Job > aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most > recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): > java.lang.IllegalStateException: unread block data > > > java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2421) > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1382) > > > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) > > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) > > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > > org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) > > > org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > > at scala.Option.foreach(Option.scala:236) > > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > Anyone have any suggestions on how to resolve this? I've seen other > reports of this "unread block data" problem (e.g. > https://issues.apache.org/jira/browse/SPARK-1867) but no real > solutions other than there's some mismatch between Hadoop versions, > which I don't think is the case here since I'm not running a real > cluster (master=local). Moreover, I get the same error when running > this code in a JUnit test. > > Thanks in advance for any guidance you can provide. > > Cheers, > Tim --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
