Actually, I can imagine a one or two line fix for this bug: call
row.asDict() inside a wrapper for DataFrame.rdd. Probably deluding myself
this could be so easily resolved? :)

On Wed, Mar 30, 2016 at 6:10 PM, Russell Jurney <russell.jur...@gmail.com>
wrote:

> Thanks to some excellent work by Luke Lovett, we have confirmed this is a
> bug. DataFrame.rdds are not the same as normal RDDs, they are serialized
> differently. It may just be unsupported functionality in PySpark. If that
> is the case, I think this should be added/fixed soon.
>
> The bug is here: https://issues.apache.org/jira/browse/SPARK-14229
> More info about the workaround from Luke is here:
> https://jira.mongodb.org/browse/HADOOP-276
>
> Please follow the SPARK bug if you're here, as more votes will get it more
> attention. I'm surprised that this hasn't been previously reported, as
> saving to a database is a pretty common thing to do from PySpark, and lots
> of analysis must be happening in DataFrames in PySpark?
>
> Anyway, the workaround for this bug is easy, cast the rows as dicts:
>
> my_dataframe = my_dataframe.map(lambda row: row.asDict())
>
>
> On Mon, Mar 28, 2016 at 8:08 PM, Russell Jurney <russell.jur...@gmail.com>
> wrote:
>
>> btw, they can't be saved to BSON either. This seems a generic issue, can
>> anyone else reproduce this?
>>
>> On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney <russell.jur...@gmail.com
>> > wrote:
>>
>>> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229
>>>
>>> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney <
>>> russell.jur...@gmail.com> wrote:
>>>
>>>> Ted, I am using the .rdd method, see above, but for some reason these
>>>> RDDs can't be saved to MongoDB or ElasticSearch.
>>>>
>>>> I think this is a bug in PySpark/DataFrame. I can't think of another
>>>> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an
>>>> arbitrary Hadoop OutputFormat. When I do this:
>>>>
>>>> on_time_lines =
>>>> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz")
>>>> on_time_performance = on_time_lines.map(lambda x: json.loads(x))
>>>>
>>>>
>>>> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>
>>>>
>>>> It works. Same data, but loaded as textFile instead of DataFrame (via
>>>> json/parquet dataframe loading).
>>>>
>>>> It is the DataFrame.rdd bit that is broken. I will file a JIRA.
>>>>
>>>> Does anyone know a workaround?
>>>>
>>>> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>>>>
>>>>> See this method:
>>>>>
>>>>>   lazy val rdd: RDD[T] = {
>>>>>
>>>>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <
>>>>> russell.jur...@gmail.com> wrote:
>>>>>
>>>>>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD.
>>>>>> This seems related to DataFrames. Is there a way to convert a DataFrame's
>>>>>> RDD to a 'normal' RDD?
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <
>>>>>> russell.jur...@gmail.com> wrote:
>>>>>>
>>>>>>> I filed a JIRA <https://jira.mongodb.org/browse/HADOOP-276> in the
>>>>>>> mongo-hadoop project, but I'm curious if anyone else has seen this 
>>>>>>> issue.
>>>>>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A
>>>>>>> contrived example works, but a dataframe does not.
>>>>>>>
>>>>>>> I activate pymongo_spark and load a dataframe:
>>>>>>>
>>>>>>> import pymongo
>>>>>>> import pymongo_spark
>>>>>>> # Important: activate pymongo_spark.
>>>>>>> pymongo_spark.activate()
>>>>>>>
>>>>>>> on_time_dataframe =
>>>>>>> sqlContext.read.parquet('../data/on_time_performance.parquet')
>>>>>>>
>>>>>>> Then I try saving to MongoDB in two ways:
>>>>>>>
>>>>>>>
>>>>>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>>
>>>>>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile(
>>>>>>>   path='file://unused',
>>>>>>>   outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
>>>>>>>   keyClass='org.apache.hadoop.io.Text',
>>>>>>>   valueClass='org.apache.hadoop.io.MapWritable',
>>>>>>>   conf={"mongo.output.uri":
>>>>>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"}
>>>>>>> )
>>>>>>>
>>>>>>>
>>>>>>> But I always get this error:
>>>>>>>
>>>>>>> In [7]:
>>>>>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to
>>>>>>> process : 1
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at
>>>>>>> PythonRDD.scala:393
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at
>>>>>>> PythonRDD.scala:393) with 1 output partitions
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage:
>>>>>>> ResultStage 2 (runJob at PythonRDD.scala:393)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final
>>>>>>> stage: List()
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents:
>>>>>>> List()
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting
>>>>>>> ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no
>>>>>>> missing parents
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored
>>>>>>> as values in memory (estimated size 19.3 KB, free 249.2 KB)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0
>>>>>>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added
>>>>>>> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: 
>>>>>>> 511.1
>>>>>>> MB)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from
>>>>>>> broadcast at DAGScheduler.scala:1006
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing
>>>>>>> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set
>>>>>>> 2.0 with 1 tasks
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0
>>>>>>> in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage
>>>>>>> 2.0 (TID 2)
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split:
>>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>>>>>>
>>>>>>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new
>>>>>>> decompressor [.gz]
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310,
>>>>>>> boot = 1249, init = 58, finish = 3
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage
>>>>>>> 2.0 (TID 2). 4475 bytes result sent to driver
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0
>>>>>>> in stage 2.0 (TID 2) in 1345 ms on localhost (1/1)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
>>>>>>> 2.0, whose tasks have all completed, from pool
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob
>>>>>>> at PythonRDD.scala:393) finished in 1.346 s
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished:
>>>>>>> runJob at PythonRDD.scala:393, took 1.361003 s
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at
>>>>>>> SerDeUtil.scala:231
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at
>>>>>>> SerDeUtil.scala:231) with 1 output partitions
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage:
>>>>>>> ResultStage 3 (take at SerDeUtil.scala:231)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final
>>>>>>> stage: List()
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents:
>>>>>>> List()
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting
>>>>>>> ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at
>>>>>>> SerDeUtil.scala:146), which has no missing parents
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored
>>>>>>> as values in memory (estimated size 19.6 KB, free 278.4 KB)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0
>>>>>>> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added
>>>>>>> broadcast_6_piece0 in memory on localhost:59881 (size: 9.8 KB, free: 
>>>>>>> 511.1
>>>>>>> MB)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from
>>>>>>> broadcast at DAGScheduler.scala:1006
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing
>>>>>>> tasks from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at
>>>>>>> SerDeUtil.scala:146)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set
>>>>>>> 3.0 with 1 tasks
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Starting task 0.0
>>>>>>> in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2666 bytes)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Running task 0.0 in stage
>>>>>>> 3.0 (TID 3)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO rdd.HadoopRDD: Input split:
>>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO compress.CodecPool: Got brand-new
>>>>>>> decompressor [.gz]
>>>>>>>
>>>>>>> 16/03/28 18:04:07 ERROR executor.Executor: Exception in task 0.0 in
>>>>>>> stage 3.0 (TID 3)
>>>>>>>
>>>>>>> net.razorvine.pickle.PickleException: expected zero arguments for
>>>>>>> construction of ClassDict (for pyspark.sql.types._create_row)
>>>>>>>
>>>>>>> at
>>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>>> (TraversableOnce.scala:273)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> 16/03/28 18:04:07 WARN scheduler.TaskSetManager: Lost task 0.0 in
>>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException:
>>>>>>> expected zero arguments for construction of ClassDict (for
>>>>>>> pyspark.sql.types._create_row)
>>>>>>>
>>>>>>> at
>>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>>> (TraversableOnce.scala:273)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>>
>>>>>>> 16/03/28 18:04:07 ERROR scheduler.TaskSetManager: Task 0 in stage
>>>>>>> 3.0 failed 1 times; aborting job
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet
>>>>>>> 3.0, whose tasks have all completed, from pool
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Cancelling stage
>>>>>>> 3
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 3 (take
>>>>>>> at SerDeUtil.scala:231) failed in 0.117 s
>>>>>>>
>>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 3 failed: take at
>>>>>>> SerDeUtil.scala:231, took 0.134593 s
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------------
>>>>>>>
>>>>>>> Py4JJavaError                             Traceback (most recent
>>>>>>> call last)
>>>>>>>
>>>>>>> <ipython-input-7-d1f984f17e27> in <module>()
>>>>>>>
>>>>>>> ----> 1 on_time_rdd.saveToMongoDB
>>>>>>> ('mongodb://localhost:27017/agile_data_science.on_time_performance')
>>>>>>>
>>>>>>>
>>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/lib/pymongo_spark.pyc in
>>>>>>> saveToMongoDB(self, connection_string, config)
>>>>>>>
>>>>>>>     104         keyConverter
>>>>>>> ='com.mongodb.spark.pickle.NoopConverter',
>>>>>>>
>>>>>>>     105         valueConverter
>>>>>>> ='com.mongodb.spark.pickle.NoopConverter',
>>>>>>>
>>>>>>> --> 106         conf=conf)
>>>>>>>
>>>>>>>     107
>>>>>>>
>>>>>>>     108
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.pyc
>>>>>>> in saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass,
>>>>>>> valueClass, keyConverter, valueConverter, conf)
>>>>>>>
>>>>>>>    1372
>>>>>>> outputFormatClass,
>>>>>>>
>>>>>>>    1373
>>>>>>> keyClass, valueClass,
>>>>>>>
>>>>>>> -> 1374
>>>>>>> keyConverter, valueConverter, jconf)
>>>>>>>
>>>>>>>    1375
>>>>>>>
>>>>>>>    1376     def saveAsHadoopDataset(self, conf, keyConverter=None,
>>>>>>> valueConverter=None):
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py
>>>>>>> in __call__(self, *args)
>>>>>>>
>>>>>>>     811         answer = self.gateway_client.send_command(command)
>>>>>>>
>>>>>>>     812         return_value = get_return_value(
>>>>>>>
>>>>>>> --> 813             answer, self.gateway_client, self.target_id,
>>>>>>> self.name)
>>>>>>>
>>>>>>>     814
>>>>>>>
>>>>>>>     815         for temp_arg in temp_args:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc
>>>>>>> in deco(*a, **kw)
>>>>>>>
>>>>>>>      43     def deco(*a, **kw):
>>>>>>>
>>>>>>>      44         try:
>>>>>>>
>>>>>>> ---> 45             return f(*a, **kw)
>>>>>>>
>>>>>>>      46         except py4j.protocol.Py4JJavaError as e:
>>>>>>>
>>>>>>>      47             s = e.java_exception.toString()
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py
>>>>>>> in get_return_value(answer, gateway_client, target_id, name)
>>>>>>>
>>>>>>>     306                 raise Py4JJavaError(
>>>>>>>
>>>>>>>     307                     "An error occurred while calling
>>>>>>> {0}{1}{2}.\n".
>>>>>>>
>>>>>>> --> 308                     format(target_id, ".", name), value)
>>>>>>>
>>>>>>>     309             else:
>>>>>>>
>>>>>>>     310                 raise Py4JError(
>>>>>>>
>>>>>>>
>>>>>>> Py4JJavaError: An error occurred while calling
>>>>>>> z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
>>>>>>>
>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>>>>>> Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 
>>>>>>> in
>>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException:
>>>>>>> expected zero arguments for construction of ClassDict (for
>>>>>>> pyspark.sql.types._create_row)
>>>>>>>
>>>>>>> at
>>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>>> (TraversableOnce.scala:273)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>>
>>>>>>> Driver stacktrace:
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>
>>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>>>>>>>
>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>>>>>>>
>>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>>>>>>>
>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
>>>>>>>
>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
>>>>>>>
>>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>>>>>
>>>>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>>>>>
>>>>>>> at org.apache.spark.rdd.RDD.take(RDD.scala:1302)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:231)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:775)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
>>>>>>>
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>>>>>
>>>>>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
>>>>>>>
>>>>>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
>>>>>>>
>>>>>>> at py4j.Gateway.invoke(Gateway.java:259)
>>>>>>>
>>>>>>> at
>>>>>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
>>>>>>>
>>>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>>>>>>>
>>>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:209)
>>>>>>>
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> Caused by: net.razorvine.pickle.PickleException: expected zero
>>>>>>> arguments for construction of ClassDict (for 
>>>>>>> pyspark.sql.types._create_row)
>>>>>>>
>>>>>>> at
>>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
>>>>>>>
>>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>>>>
>>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>>>>>>>
>>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>>>>
>>>>>>> at scala.collection.TraversableOnce$class.to
>>>>>>> (TraversableOnce.scala:273)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>>>>
>>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>>>>>>>
>>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>>>>>
>>>>>>> at
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>>>>>
>>>>>>> ... 1 more
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com
>>>>>>> relato.io
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>>
>>>
>>>
>>>
>>> --
>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>>
>>
>>
>>
>> --
>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>>
>
>
>
> --
> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io
>



-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io

Reply via email to