Thanks to some excellent work by Luke Lovett, we have confirmed this is a
bug. DataFrame.rdds are not the same as normal RDDs, they are serialized
differently. It may just be unsupported functionality in PySpark. If that
is the case, I think this should be added/fixed soon.

The bug is here: https://issues.apache.org/jira/browse/SPARK-14229
More info about the workaround from Luke is here:
https://jira.mongodb.org/browse/HADOOP-276

Please follow the SPARK bug if you're here, as more votes will get it more
attention. I'm surprised that this hasn't been previously reported, as
saving to a database is a pretty common thing to do from PySpark, and lots
of analysis must be happening in DataFrames in PySpark?

Anyway, the workaround for this bug is easy, cast the rows as dicts:

my_dataframe = my_dataframe.map(lambda row: row.asDict())


On Mon, Mar 28, 2016 at 8:08 PM, Russell Jurney <russell.jur...@gmail.com>
wrote:

> btw, they can't be saved to BSON either. This seems a generic issue, can
> anyone else reproduce this?
>
> On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney <russell.jur...@gmail.com>
> wrote:
>
>> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229
>>
>> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney <russell.jur...@gmail.com
>> > wrote:
>>
>>> Ted, I am using the .rdd method, see above, but for some reason these
>>> RDDs can't be saved to MongoDB or ElasticSearch.
>>>
>>> I think this is a bug in PySpark/DataFrame. I can't think of another
>>> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
>>> arbitrary Hadoop OutputFormat. When I do this:
>>>
>>> on_time_lines =
>>> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
>>> on_time_performance = on_time_lines.map(lambda x: json.loads(x))
>>>
>>>
>>> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>
>>>
>>> It works. Same data, but loaded as textFile instead of DataFrame (via
>>> json/parquet dataframe loading).
>>>
>>> It is the DataFrame.rdd bit that is broken. I will file a JIRA.
>>>
>>> Does anyone know a workaround?
>>>
>>> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>
>>>> See this method:
>>>>
>>>>   lazy val rdd: RDD[T] = {
>>>>
>>>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <
>>>> russell.jur...@gmail.com> wrote:
>>>>
>>>>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
>>>>> This seems related to DataFrames. Is there a way to convert a DataFrame's
>>>>> RDD to a 'normal' RDD?
>>>>>
>>>>>
>>>>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
>>>>> russell.jur...@gmail.com> wrote:
>>>>>
>>>>>> I filed a JIRA <https://jira.mongodb.org/browse/HADOOP-276> in the
>>>>>> mongo-hadoop project, but I'm curious if anyone else has seen this issue.
>>>>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>>>>>> contrived example works, but a dataframe does not.
>>>>>>
>>>>>> I activate pymongo_spark and load a dataframe:
>>>>>>
>>>>>> import pymongo
>>>>>> import pymongo_spark
>>>>>> # Important: activate pymongo_spark.
>>>>>> pymongo_spark.activate()
>>>>>>
>>>>>> on_time_dataframe =
>>>>>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>>>>>
>>>>>> Then I try saving to MongoDB in two ways:
>>>>>>
>>>>>>
>>>>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>
>>>>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>>>>>   path='file://unused',
>>>>>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>>>>>   keyClass='org.apache.hadoop.io.Text',
>>>>>>   valueClass='org.apache.hadoop.io.MapWritable',
>>>>>>   conf={"mongo.output.uri":
>>>>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>>>>>> )
>>>>>>
>>>>>>
>>>>>> But I always get this error:
>>>>>>
>>>>>> In [7]:
>>>>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>>>>>> process : 1
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>>>>>> PythonRDD.scala:393
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>>>>>> PythonRDD.scala:393) with 1 output partitions
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage:
>>>>>> ResultStage 2 (runJob at PythonRDD.scala:393)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final
>>>>>> stage: List()
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List()
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage
>>>>>> 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing 
>>>>>> parents
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored
>>>>>> as values in memory (estimated size 19.3 KB, free 249.2 KB)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
>>>>>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added
>>>>>> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 
>>>>>> 511.1
>>>>>> MB)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
>>>>>> broadcast at DAGScheduler.scala:1006
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing
>>>>>> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set
>>>>>> 2.0 with 1 tasks
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in
>>>>>> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage
>>>>>> 2.0 (TID 2)
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>>>>>
>>>>>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor
>>>>>> [.gz]
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot
>>>>>> = 1249, init = 58, finish = 3
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage
>>>>>> 2.0 (TID 2). 4475 bytes result sent to driver
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in
>>>>>> stage 2.0 (TID 2) in 1345 ms on localhost (1/1)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
>>>>>> 2.0, whose tasks have all completed, from pool
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob
>>>>>> at PythonRDD.scala:393) finished in 1.346 s
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob
>>>>>> at PythonRDD.scala:393, took 1.361003 s
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at
>>>>>> SerDeUtil.scala:231
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at
>>>>>> SerDeUtil.scala:231) with 1 output partitions
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage:
>>>>>> ResultStage 3 (take at SerDeUtil.scala:231)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final
>>>>>> stage: List()
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List()
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage
>>>>>> 3 (MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which 
>>>>>> has
>>>>>> no missing parents
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored
>>>>>> as values in memory (estimated size 19.6 KB, free 278.4 KB)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0
>>>>>> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added
>>>>>> broadcast_6_piece0 in memory on localhost:59881 (size: 9.8 KB, free: 
>>>>>> 511.1
>>>>>> MB)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from
>>>>>> broadcast at DAGScheduler.scala:1006
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing
>>>>>> tasks from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at
>>>>>> SerDeUtil.scala:146)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set
>>>>>> 3.0 with 1 tasks
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Starting task 0.0 in
>>>>>> stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Running task 0.0 in stage
>>>>>> 3.0 (TID 3)
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO rdd.HadoopRDD: Input split:
>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO compress.CodecPool: Got brand-new decompressor
>>>>>> [.gz]
>>>>>>
>>>>>> 16/03/28 18:04:07 ERROR executor.Executor: Exception in task 0.0 in
>>>>>> stage 3.0 (TID 3)
>>>>>>
>>>>>> net.razorvine.pickle.PickleException: expected zero arguments for
>>>>>> construction of ClassDict (for pyspark.sql.types._create_row)
>>>>>>
>>>>>> at
>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>
>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>> (TraversableOnce.scala:273)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> 16/03/28 18:04:07 WARN scheduler.TaskSetManager: Lost task 0.0 in
>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException:
>>>>>> expected zero arguments for construction of ClassDict (for
>>>>>> pyspark.sql.types._create_row)
>>>>>>
>>>>>> at
>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>
>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>> (TraversableOnce.scala:273)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>
>>>>>> 16/03/28 18:04:07 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0
>>>>>> failed 1 times; aborting job
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
>>>>>> 3.0, whose tasks have all completed, from pool
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Cancelling stage 3
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 3 (take at
>>>>>> SerDeUtil.scala:231) failed in 0.117 s
>>>>>>
>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 3 failed: take at
>>>>>> SerDeUtil.scala:231, took 0.134593 s
>>>>>>
>>>>>>
>>>>>> ---------------------------------------------------------------------------
>>>>>>
>>>>>> Py4JJavaError                             Traceback (most recent
>>>>>> call last)
>>>>>>
>>>>>> <ipython-input-7-d1f984f17e27> in <module>()
>>>>>>
>>>>>> ----> 1 on_time_rdd.saveToMongoDB
>>>>>> ('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>
>>>>>>
>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/lib/pymongo_spark.pyc in
>>>>>> saveToMongoDB(self, connection_string, config)
>>>>>>
>>>>>>     104         keyConverter
>>>>>> ='com.mongodb.spark.pickle.NoopConverter',
>>>>>>
>>>>>>     105         valueConverter
>>>>>> ='com.mongodb.spark.pickle.NoopConverter',
>>>>>>
>>>>>> --> 106         conf=conf)
>>>>>>
>>>>>>     107
>>>>>>
>>>>>>     108
>>>>>>
>>>>>>
>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.pyc
>>>>>> in saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass,
>>>>>> valueClass, keyConverter, valueConverter, conf)
>>>>>>
>>>>>>    1372
>>>>>> outputFormatClass,
>>>>>>
>>>>>>    1373
>>>>>> keyClass, valueClass,
>>>>>>
>>>>>> -> 1374
>>>>>> keyConverter, valueConverter, jconf)
>>>>>>
>>>>>>    1375
>>>>>>
>>>>>>    1376     def saveAsHadoopDataset(self, conf, keyConverter=None,
>>>>>> valueConverter=None):
>>>>>>
>>>>>>
>>>>>>
>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>>>>>> in __call__(self, *args)
>>>>>>
>>>>>>     811         answer = self.gateway_client.send_command(command)
>>>>>>
>>>>>>     812         return_value = get_return_value(
>>>>>>
>>>>>> --> 813             answer, self.gateway_client, self.target_id,
>>>>>> self.name)
>>>>>>
>>>>>>     814
>>>>>>
>>>>>>     815         for temp_arg in temp_args:
>>>>>>
>>>>>>
>>>>>>
>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc
>>>>>> in deco(*a, **kw)
>>>>>>
>>>>>>      43     def deco(*a, **kw):
>>>>>>
>>>>>>      44         try:
>>>>>>
>>>>>> ---> 45             return f(*a, **kw)
>>>>>>
>>>>>>      46         except py4j.protocol.Py4JJavaError as e:
>>>>>>
>>>>>>      47             s = e.java_exception.toString()
>>>>>>
>>>>>>
>>>>>>
>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>>>>>> in get_return_value(answer, gateway_client, target_id, name)
>>>>>>
>>>>>>     306                 raise Py4JJavaError(
>>>>>>
>>>>>>     307                     "An error occurred while calling
>>>>>> {0}{1}{2}.\n".
>>>>>>
>>>>>> --> 308                     format(target_id, ".", name), value)
>>>>>>
>>>>>>     309             else:
>>>>>>
>>>>>>     310                 raise Py4JError(
>>>>>>
>>>>>>
>>>>>> Py4JJavaError: An error occurred while calling
>>>>>> z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
>>>>>>
>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>> Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in
>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException:
>>>>>> expected zero arguments for construction of ClassDict (for
>>>>>> pyspark.sql.types._create_row)
>>>>>>
>>>>>> at
>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>
>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>> (TraversableOnce.scala:273)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>
>>>>>> Driver stacktrace:
>>>>>>
>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>
>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>
>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>>
>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>>
>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>>>>>
>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>>>>>
>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>>>>>
>>>>>> at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>
>>>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>
>>>>>> at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:231)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:775)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
>>>>>>
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>
>>>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>
>>>>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>>>>
>>>>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>>>>>
>>>>>> at py4j.Gateway.invoke(Gateway.java:259)
>>>>>>
>>>>>> at
>>>>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>>>>
>>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>>>
>>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>>>>>
>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> Caused by: net.razorvine.pickle.PickleException: expected zero
>>>>>> arguments for construction of ClassDict (for 
>>>>>> pyspark.sql.types._create_row)
>>>>>>
>>>>>> at
>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>
>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>
>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>
>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>
>>>>>> at
>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>
>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>> (TraversableOnce.scala:273)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>
>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>
>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>
>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>
>>>>>> at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>
>>>>>> ... 1 more
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>
>>
>>
>>
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>



-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io

Reply via email to