I tried jsonRDD(...).printSchema() and it worked. Seems the problem is when
we take the data back to the Python side, SchemaRDD#javaToPython failed on
your cases. I have created https://issues.apache.org/jira/browse/SPARK-2875
to track it.

Thanks,

Yin


On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller <bmill...@eecs.berkeley.edu>
wrote:

> Hi All,
>
> I checked out and built master.  Note that Maven had a problem building
> Kafka (in my case, at least); I was unable to fix this easily so I moved on
> since it seemed unlikely to have any influence on the problem at hand.
>
> Master improves functionality (including the example Nicholas just
> demonstrated) but unfortunately there still seems to be a bug related to
> using dictionaries as values.  I've put some code below to illustrate the
> bug.
>
> *# dictionary as value works fine*
> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1":
> "value"}}'])).collect()
> [Row(key0=Row(key1=u'value'))]
>
> *# dictionary as value works fine, even when inner keys are varied*
> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": "value1"}}',
> '{"key0": {"key2": "value2"}}'])).collect()
> [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
> key2=u'value2'))]
>
> *# dictionary as value works fine when inner keys are missing and outer
> key is present*
> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {}}', '{"key0": {"key1":
> "value1"}}'])).collect()
> [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]
>
> *# dictionary as value FAILS when outer key is missing*
> *> print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": {"key1":
> "value1"}}'])).collect()*
> Py4JJavaError: An error occurred while calling o84.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in
> stage 7.0 (TID 242, engelland.research.intel-research.net):
> java.lang.NullPointerException...
>
> *# dictionary as value FAILS when outer key is present with null value*
> *> print sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', '{"key0":
> {"key1": "value1"}}'])).collect()*
> Py4JJavaError: An error occurred while calling o98.collect.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in
> stage 9.0 (TID 305, kunitz.research.intel-research.net):
> java.lang.NullPointerException...
>
> *# nested lists work even when outer key is missing*
> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": [["item0",
> "item1"], ["item2", "item3"]]}'])).collect()
> [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]
>
> Is anyone able to replicate this behavior?
>
> -Brad
>
>
>
>
> On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> We try to keep master very stable, but this is where active development
>> happens. YMMV, but a lot of people do run very close to master without
>> incident (myself included).
>>
>> branch-1.0 has been cut for a while and we only merge bug fixes into it
>> (this is more strict for non-alpha components like spark core.).  For Spark
>> SQL, this branch is pretty far behind as the project is very young and we
>> are fixing bugs / adding features very rapidly compared with Spark core.
>>
>> branch-1.1 was just cut and is being QAed for a release, at this point
>> its likely the same as master, but that will change as features start
>> getting added to master in the coming weeks.
>>
>>
>>
>> On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> collect() works, too.
>>>
>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', 
>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).collect()
>>> [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]
>>>
>>> Can’t answer your question about branch stability, though. Spark is a
>>> very active project, so stuff is happening all the time.
>>>
>>> Nick
>>> ​
>>>
>>>
>>> On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller <bmill...@eecs.berkeley.edu>
>>> wrote:
>>>
>>>> Hi Nick,
>>>>
>>>> Can you check that the call to "collect()" works as well as
>>>> "printSchema()"?  I actually experience that "printSchema()" works fine,
>>>> but then it crashes on "collect()".
>>>>
>>>> In general, should I expect the master (which seems to be on
>>>> branch-1.1) to be any more/less stable than branch-1.0?  While it would be
>>>> great to have this fixed, it would be good to know if I should expect lots
>>>> of other instability.
>>>>
>>>> best,
>>>> -Brad
>>>>
>>>>
>>>> On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas <
>>>> nicholas.cham...@gmail.com> wrote:
>>>>
>>>>> This looks to be fixed in master:
>>>>>
>>>>> >>> from pyspark.sql import SQLContext>>> sqlContext = SQLContext(sc)
>>>>> >>> sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', '{"foo":[[1,2,3], 
>>>>> >>> [4,5,6]]}'
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ])
>>>>> ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315>>> 
>>>>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', 
>>>>> '{"foo":[[1,2,3], [4,5,6]]}']))
>>>>> MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408>>> 
>>>>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', 
>>>>> '{"foo":[[1,2,3], [4,5,6]]}'])).printSchema()
>>>>> root
>>>>>  |-- foo: array (nullable = true)
>>>>>  |    |-- element: array (containsNull = false)
>>>>>  |    |    |-- element: integer (containsNull = false)
>>>>>
>>>>> >>>
>>>>>
>>>>> Nick
>>>>> ​
>>>>>
>>>>>
>>>>> On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller <
>>>>> bmill...@eecs.berkeley.edu> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I've built and deployed the current head of branch-1.0, but it seems
>>>>>> to have only partly fixed the bug.
>>>>>>
>>>>>> This code now runs as expected with the indicated output:
>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[1,2,3]}',
>>>>>> '{"foo":[4,5,6]}']))
>>>>>> > srdd.printSchema()
>>>>>> root
>>>>>>  |-- foo: ArrayType[IntegerType]
>>>>>> > srdd.collect()
>>>>>> [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}]
>>>>>>
>>>>>> This code still crashes:
>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}',
>>>>>> '{"foo":[[1,2,3], [4,5,6]]}']))
>>>>>> > srdd.printSchema()
>>>>>> root
>>>>>>  |-- foo: ArrayType[ArrayType(IntegerType)]
>>>>>> > srdd.collect()
>>>>>> Py4JJavaError: An error occurred while calling o63.collect.
>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 3.0:29 failed 4 times, most recent failure: Exception failure in TID
>>>>>> 67 on host kunitz.research.intel-research.net:
>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean:
>>>>>> java.lang.IllegalArgumentException: wrong number of arguments
>>>>>>
>>>>>> I may be able to see if this is fixed in master, but since it's not
>>>>>> fixed in 1.0.3 it seems unlikely to be fixed in master either. I 
>>>>>> previously
>>>>>> tried master as well, but ran into a build problem that did not occur 
>>>>>> with
>>>>>> the 1.0 branch.
>>>>>>
>>>>>> Can anybody else verify that the second example still crashes (and is
>>>>>> meant to work)? If so, would it be best to modify JIRA-2376 or start a 
>>>>>> new
>>>>>> bug?
>>>>>> https://issues.apache.org/jira/browse/SPARK-2376
>>>>>>
>>>>>> best,
>>>>>> -Brad
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller <
>>>>>> bmill...@eecs.berkeley.edu> wrote:
>>>>>>
>>>>>>> Nick: Thanks for both the original JIRA bug report and the link.
>>>>>>>
>>>>>>> Michael: This is on the 1.0.1 release.  I'll update to master and
>>>>>>> follow-up if I have any problems.
>>>>>>>
>>>>>>> best,
>>>>>>> -Brad
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust <
>>>>>>> mich...@databricks.com> wrote:
>>>>>>>
>>>>>>>> Is this on 1.0.1?  I'd suggest running this on master or the 1.1-RC
>>>>>>>> which should be coming out this week.  Pyspark did not have good 
>>>>>>>> support
>>>>>>>> for nested data previously.  If you still encounter issues using a more
>>>>>>>> recent version, please file a JIRA.  Thanks!
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller <
>>>>>>>> bmill...@eecs.berkeley.edu> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I am interested to use jsonRDD and jsonFile to create a SchemaRDD
>>>>>>>>> out of some JSON data I have, but I've run into some instability 
>>>>>>>>> involving
>>>>>>>>> the following java exception:
>>>>>>>>>
>>>>>>>>> An error occurred while calling o1326.collect.
>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: Exception
>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net:
>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean:
>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments
>>>>>>>>>
>>>>>>>>> I've pasted code which produces the error as well as the full
>>>>>>>>> traceback below.  Note that I don't have any problem when I parse the 
>>>>>>>>> JSON
>>>>>>>>> myself and use inferSchema.
>>>>>>>>>
>>>>>>>>> Is anybody able to reproduce this bug?
>>>>>>>>>
>>>>>>>>> -Brad
>>>>>>>>>
>>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":"bar",
>>>>>>>>> "baz":[1,2,3]}', '{"foo":"boom", "baz":[1,2,3]}']))
>>>>>>>>> > srdd.printSchema()
>>>>>>>>>
>>>>>>>>> root
>>>>>>>>>  |-- baz: ArrayType[IntegerType]
>>>>>>>>>  |-- foo: StringType
>>>>>>>>>
>>>>>>>>> > srdd.collect()
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> ---------------------------------------------------------------------------
>>>>>>>>> Py4JJavaError                             Traceback (most recent
>>>>>>>>> call last)
>>>>>>>>> <ipython-input-89-ec7e8e8c68c4> in <module>()
>>>>>>>>> ----> 1 srdd.collect()
>>>>>>>>>
>>>>>>>>> /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py in
>>>>>>>>> collect(self)
>>>>>>>>>     581         """
>>>>>>>>>     582         with _JavaStackTrace(self.context) as st:
>>>>>>>>> --> 583           bytesInJava = self._jrdd.collect().iterator()
>>>>>>>>>     584         return
>>>>>>>>> list(self._collect_iterator_through_file(bytesInJava))
>>>>>>>>>     585
>>>>>>>>>
>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in
>>>>>>>>> __call__(self, *args)
>>>>>>>>>     535         answer = self.gateway_client.send_command(command)
>>>>>>>>>     536         return_value = get_return_value(answer,
>>>>>>>>> self.gateway_client,
>>>>>>>>> --> 537                 self.target_id, self.name)
>>>>>>>>>     538
>>>>>>>>>     539         for temp_arg in temp_args:
>>>>>>>>>
>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in
>>>>>>>>> get_return_value(answer, gateway_client, target_id, name)
>>>>>>>>>     298                 raise Py4JJavaError(
>>>>>>>>>     299                     'An error occurred while calling
>>>>>>>>> {0}{1}{2}.\n'.
>>>>>>>>> --> 300                     format(target_id, '.', name), value)
>>>>>>>>>     301             else:
>>>>>>>>>     302                 raise Py4JError(
>>>>>>>>>
>>>>>>>>> Py4JJavaError: An error occurred while calling o1326.collect.
>>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage
>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent failure: Exception
>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net:
>>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect javabean:
>>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments
>>>>>>>>>         net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>         net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>
>>>>>>>>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>         net.razorvine.pickle.Pickler.dump(Pickler.java:95)
>>>>>>>>>         net.razorvine.pickle.Pickler.dumps(Pickler.java:80)
>>>>>>>>>
>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
>>>>>>>>>
>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
>>>>>>>>>         scala.collection.Iterator$anon$11.next(Iterator.scala:328)
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
>>>>>>>>>
>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>>>>>>>>>
>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>>>>>>>>> Driver stacktrace:
>>>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>>>> $apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
>>>>>>>>> at
>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>> at
>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
>>>>>>>>> at
>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>>>>>> at
>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to