Nice catch Brad and thanks to Yin and Davies for getting on it so quickly.
On Wed, Aug 6, 2014 at 2:45 AM, Davies Liu <dav...@databricks.com> wrote: > There is a PR to fix this: https://github.com/apache/spark/pull/1802 > > On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller <bmill...@eecs.berkeley.edu> > wrote: > > I concur that printSchema works; it just seems to be operations that use > the > > data where trouble happens. > > > > Thanks for posting the bug. > > > > -Brad > > > > > > On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai <yh...@databricks.com> wrote: > >> > >> I tried jsonRDD(...).printSchema() and it worked. Seems the problem is > >> when we take the data back to the Python side, SchemaRDD#javaToPython > failed > >> on your cases. I have created > >> https://issues.apache.org/jira/browse/SPARK-2875 to track it. > >> > >> Thanks, > >> > >> Yin > >> > >> > >> On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller <bmill...@eecs.berkeley.edu > > > >> wrote: > >>> > >>> Hi All, > >>> > >>> I checked out and built master. Note that Maven had a problem building > >>> Kafka (in my case, at least); I was unable to fix this easily so I > moved on > >>> since it seemed unlikely to have any influence on the problem at hand. > >>> > >>> Master improves functionality (including the example Nicholas just > >>> demonstrated) but unfortunately there still seems to be a bug related > to > >>> using dictionaries as values. I've put some code below to illustrate > the > >>> bug. > >>> > >>> # dictionary as value works fine > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": > >>> > "value"}}'])).collect() > >>> [Row(key0=Row(key1=u'value'))] > >>> > >>> # dictionary as value works fine, even when inner keys are varied > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": "value1"}}', > >>> > '{"key0": {"key2": "value2"}}'])).collect() > >>> [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None, > >>> key2=u'value2'))] > >>> > >>> # dictionary as value works fine when inner keys are missing and outer > >>> key is present > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {}}', '{"key0": > {"key1": > >>> > "value1"}}'])).collect() > >>> [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))] > >>> > >>> # dictionary as value FAILS when outer key is missing > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": {"key1": > >>> > "value1"}}'])).collect() > >>> Py4JJavaError: An error occurred while calling o84.collect. > >>> : org.apache.spark.SparkException: Job aborted due to stage failure: > Task > >>> 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in > stage > >>> 7.0 (TID 242, engelland.research.intel-research.net): > >>> java.lang.NullPointerException... > >>> > >>> # dictionary as value FAILS when outer key is present with null value > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', '{"key0": > >>> > {"key1": "value1"}}'])).collect() > >>> Py4JJavaError: An error occurred while calling o98.collect. > >>> : org.apache.spark.SparkException: Job aborted due to stage failure: > Task > >>> 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in > stage > >>> 9.0 (TID 305, kunitz.research.intel-research.net): > >>> java.lang.NullPointerException... > >>> > >>> # nested lists work even when outer key is missing > >>> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": [["item0", > >>> > "item1"], ["item2", "item3"]]}'])).collect() > >>> [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', > u'item3']])] > >>> > >>> Is anyone able to replicate this behavior? > >>> > >>> -Brad > >>> > >>> > >>> > >>> > >>> On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust < > mich...@databricks.com> > >>> wrote: > >>>> > >>>> We try to keep master very stable, but this is where active > development > >>>> happens. YMMV, but a lot of people do run very close to master without > >>>> incident (myself included). > >>>> > >>>> branch-1.0 has been cut for a while and we only merge bug fixes into > it > >>>> (this is more strict for non-alpha components like spark core.). For > Spark > >>>> SQL, this branch is pretty far behind as the project is very young > and we > >>>> are fixing bugs / adding features very rapidly compared with Spark > core. > >>>> > >>>> branch-1.1 was just cut and is being QAed for a release, at this point > >>>> its likely the same as master, but that will change as features start > >>>> getting added to master in the coming weeks. > >>>> > >>>> > >>>> > >>>> On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas > >>>> <nicholas.cham...@gmail.com> wrote: > >>>>> > >>>>> collect() works, too. > >>>>> > >>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', > >>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).collect() > >>>>> [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])] > >>>>> > >>>>> Can’t answer your question about branch stability, though. Spark is a > >>>>> very active project, so stuff is happening all the time. > >>>>> > >>>>> Nick > >>>>> > >>>>> > >>>>> > >>>>> On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller > >>>>> <bmill...@eecs.berkeley.edu> wrote: > >>>>>> > >>>>>> Hi Nick, > >>>>>> > >>>>>> Can you check that the call to "collect()" works as well as > >>>>>> "printSchema()"? I actually experience that "printSchema()" works > fine, but > >>>>>> then it crashes on "collect()". > >>>>>> > >>>>>> In general, should I expect the master (which seems to be on > >>>>>> branch-1.1) to be any more/less stable than branch-1.0? While it > would be > >>>>>> great to have this fixed, it would be good to know if I should > expect lots > >>>>>> of other instability. > >>>>>> > >>>>>> best, > >>>>>> -Brad > >>>>>> > >>>>>> > >>>>>> On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas > >>>>>> <nicholas.cham...@gmail.com> wrote: > >>>>>>> > >>>>>>> This looks to be fixed in master: > >>>>>>> > >>>>>>> >>> from pyspark.sql import SQLContext > >>>>>>> >>> sqlContext = SQLContext(sc) > >>>>>>> >>> sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', '{"foo":[[1,2,3], > >>>>>>> >>> [4,5,6]]}' > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> ]) > >>>>>>> ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315 > >>>>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], > [4,5,6]]}', > >>>>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])) > >>>>>>> MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408 > >>>>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], > [4,5,6]]}', > >>>>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).printSchema() > >>>>>>> root > >>>>>>> |-- foo: array (nullable = true) > >>>>>>> | |-- element: array (containsNull = false) > >>>>>>> | | |-- element: integer (containsNull = false) > >>>>>>> > >>>>>>> >>> > >>>>>>> > >>>>>>> Nick > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller > >>>>>>> <bmill...@eecs.berkeley.edu> wrote: > >>>>>>>> > >>>>>>>> Hi All, > >>>>>>>> > >>>>>>>> I've built and deployed the current head of branch-1.0, but it > seems > >>>>>>>> to have only partly fixed the bug. > >>>>>>>> > >>>>>>>> This code now runs as expected with the indicated output: > >>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[1,2,3]}', > >>>>>>>> > '{"foo":[4,5,6]}'])) > >>>>>>>> > srdd.printSchema() > >>>>>>>> root > >>>>>>>> |-- foo: ArrayType[IntegerType] > >>>>>>>> > srdd.collect() > >>>>>>>> [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}] > >>>>>>>> > >>>>>>>> This code still crashes: > >>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], > >>>>>>>> > [4,5,6]]}', '{"foo":[[1,2,3], [4,5,6]]}'])) > >>>>>>>> > srdd.printSchema() > >>>>>>>> root > >>>>>>>> |-- foo: ArrayType[ArrayType(IntegerType)] > >>>>>>>> > srdd.collect() > >>>>>>>> Py4JJavaError: An error occurred while calling o63.collect. > >>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage > failure: > >>>>>>>> Task 3.0:29 failed 4 times, most recent failure: Exception > failure in TID 67 > >>>>>>>> on host kunitz.research.intel-research.net: > >>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect > javabean: > >>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments > >>>>>>>> > >>>>>>>> I may be able to see if this is fixed in master, but since it's > not > >>>>>>>> fixed in 1.0.3 it seems unlikely to be fixed in master either. I > previously > >>>>>>>> tried master as well, but ran into a build problem that did not > occur with > >>>>>>>> the 1.0 branch. > >>>>>>>> > >>>>>>>> Can anybody else verify that the second example still crashes (and > >>>>>>>> is meant to work)? If so, would it be best to modify JIRA-2376 or > start a > >>>>>>>> new bug? > >>>>>>>> https://issues.apache.org/jira/browse/SPARK-2376 > >>>>>>>> > >>>>>>>> best, > >>>>>>>> -Brad > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller > >>>>>>>> <bmill...@eecs.berkeley.edu> wrote: > >>>>>>>>> > >>>>>>>>> Nick: Thanks for both the original JIRA bug report and the link. > >>>>>>>>> > >>>>>>>>> Michael: This is on the 1.0.1 release. I'll update to master and > >>>>>>>>> follow-up if I have any problems. > >>>>>>>>> > >>>>>>>>> best, > >>>>>>>>> -Brad > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust > >>>>>>>>> <mich...@databricks.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Is this on 1.0.1? I'd suggest running this on master or the > >>>>>>>>>> 1.1-RC which should be coming out this week. Pyspark did not > have good > >>>>>>>>>> support for nested data previously. If you still encounter > issues using a > >>>>>>>>>> more recent version, please file a JIRA. Thanks! > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller > >>>>>>>>>> <bmill...@eecs.berkeley.edu> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi All, > >>>>>>>>>>> > >>>>>>>>>>> I am interested to use jsonRDD and jsonFile to create a > SchemaRDD > >>>>>>>>>>> out of some JSON data I have, but I've run into some > instability involving > >>>>>>>>>>> the following java exception: > >>>>>>>>>>> > >>>>>>>>>>> An error occurred while calling o1326.collect. > >>>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage > >>>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: > Exception > >>>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net: > >>>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect > javabean: > >>>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments > >>>>>>>>>>> > >>>>>>>>>>> I've pasted code which produces the error as well as the full > >>>>>>>>>>> traceback below. Note that I don't have any problem when I > parse the JSON > >>>>>>>>>>> myself and use inferSchema. > >>>>>>>>>>> > >>>>>>>>>>> Is anybody able to reproduce this bug? > >>>>>>>>>>> > >>>>>>>>>>> -Brad > >>>>>>>>>>> > >>>>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":"bar", > >>>>>>>>>>> > "baz":[1,2,3]}', '{"foo":"boom", "baz":[1,2,3]}'])) > >>>>>>>>>>> > srdd.printSchema() > >>>>>>>>>>> > >>>>>>>>>>> root > >>>>>>>>>>> |-- baz: ArrayType[IntegerType] > >>>>>>>>>>> |-- foo: StringType > >>>>>>>>>>> > >>>>>>>>>>> > srdd.collect() > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > --------------------------------------------------------------------------- > >>>>>>>>>>> Py4JJavaError Traceback (most > recent > >>>>>>>>>>> call last) > >>>>>>>>>>> <ipython-input-89-ec7e8e8c68c4> in <module>() > >>>>>>>>>>> ----> 1 srdd.collect() > >>>>>>>>>>> > >>>>>>>>>>> /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in > >>>>>>>>>>> collect(self) > >>>>>>>>>>> 581 """ > >>>>>>>>>>> 582 with _JavaStackTrace(self.context) as st: > >>>>>>>>>>> --> 583 bytesInJava = self._jrdd.collect().iterator() > >>>>>>>>>>> 584 return > >>>>>>>>>>> list(self._collect_iterator_through_file(bytesInJava)) > >>>>>>>>>>> 585 > >>>>>>>>>>> > >>>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in > >>>>>>>>>>> __call__(self, *args) > >>>>>>>>>>> 535 answer = > >>>>>>>>>>> self.gateway_client.send_command(command) > >>>>>>>>>>> 536 return_value = get_return_value(answer, > >>>>>>>>>>> self.gateway_client, > >>>>>>>>>>> --> 537 self.target_id, self.name) > >>>>>>>>>>> 538 > >>>>>>>>>>> 539 for temp_arg in temp_args: > >>>>>>>>>>> > >>>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in > >>>>>>>>>>> get_return_value(answer, gateway_client, target_id, name) > >>>>>>>>>>> 298 raise Py4JJavaError( > >>>>>>>>>>> 299 'An error occurred while calling > >>>>>>>>>>> {0}{1}{2}.\n'. > >>>>>>>>>>> --> 300 format(target_id, '.', name), > value) > >>>>>>>>>>> 301 else: > >>>>>>>>>>> 302 raise Py4JError( > >>>>>>>>>>> > >>>>>>>>>>> Py4JJavaError: An error occurred while calling o1326.collect. > >>>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage > >>>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: > Exception > >>>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net: > >>>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect > javabean: > >>>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments > >>>>>>>>>>> > >>>>>>>>>>> net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603) > >>>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:299) > >>>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) > >>>>>>>>>>> net.razorvine.pickle.Pickler.put_map(Pickler.java:322) > >>>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:286) > >>>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) > >>>>>>>>>>> > >>>>>>>>>>> > net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392) > >>>>>>>>>>> net.razorvine.pickle.Pickler.dispatch(Pickler.java:195) > >>>>>>>>>>> net.razorvine.pickle.Pickler.save(Pickler.java:125) > >>>>>>>>>>> net.razorvine.pickle.Pickler.dump(Pickler.java:95) > >>>>>>>>>>> net.razorvine.pickle.Pickler.dumps(Pickler.java:80) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385) > >>>>>>>>>>> > >>>>>>>>>>> scala.collection.Iterator$anon$11.next(Iterator.scala:328) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160) > >>>>>>>>>>> > >>>>>>>>>>> > org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174) > >>>>>>>>>>> Driver stacktrace: > >>>>>>>>>>> at > >>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634) > >>>>>>>>>>> at scala.Option.foreach(Option.scala:236) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) > >>>>>>>>>>> at > >>>>>>>>>>> > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) > >>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > >>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) > >>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > >>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) > >>>>>>>>>>> at > >>>>>>>>>>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >>>>>>>>>>> at > >>>>>>>>>>> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >