Actually, I can imagine a one or two line fix for this bug: call row.asDict() inside a wrapper for DataFrame.rdd. Probably deluding myself this could be so easily resolved? :)
On Wed, Mar 30, 2016 at 6:10 PM, Russell Jurney <russell.jur...@gmail.com> wrote: > Thanks to some excellent work by Luke Lovett, we have confirmed this is a > bug. DataFrame.rdds are not the same as normal RDDs, they are serialized > differently. It may just be unsupported functionality in PySpark. If that > is the case, I think this should be added/fixed soon. > > The bug is here: https://issues.apache.org/jira/browse/SPARK-14229 > More info about the workaround from Luke is here: > https://jira.mongodb.org/browse/HADOOP-276 > > Please follow the SPARK bug if you're here, as more votes will get it more > attention. I'm surprised that this hasn't been previously reported, as > saving to a database is a pretty common thing to do from PySpark, and lots > of analysis must be happening in DataFrames in PySpark? > > Anyway, the workaround for this bug is easy, cast the rows as dicts: > > my_dataframe = my_dataframe.map(lambda row: row.asDict()) > > > On Mon, Mar 28, 2016 at 8:08 PM, Russell Jurney <russell.jur...@gmail.com> > wrote: > >> btw, they can't be saved to BSON either. This seems a generic issue, can >> anyone else reproduce this? >> >> On Mon, Mar 28, 2016 at 8:02 PM, Russell Jurney <russell.jur...@gmail.com >> > wrote: >> >>> I created a JIRA: https://issues.apache.org/jira/browse/SPARK-14229 >>> >>> On Mon, Mar 28, 2016 at 7:43 PM, Russell Jurney < >>> russell.jur...@gmail.com> wrote: >>> >>>> Ted, I am using the .rdd method, see above, but for some reason these >>>> RDDs can't be saved to MongoDB or ElasticSearch. >>>> >>>> I think this is a bug in PySpark/DataFrame. I can't think of another >>>> explanation... somehow DataFrame.rdd RDDs are not able to be stored to an >>>> arbitrary Hadoop OutputFormat. When I do this: >>>> >>>> on_time_lines = >>>> sc.textFile("../data/On_Time_On_Time_Performance_2015.jsonl.gz") >>>> on_time_performance = on_time_lines.map(lambda x: json.loads(x)) >>>> >>>> >>>> on_time_performance.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>> >>>> >>>> It works. Same data, but loaded as textFile instead of DataFrame (via >>>> json/parquet dataframe loading). >>>> >>>> It is the DataFrame.rdd bit that is broken. I will file a JIRA. >>>> >>>> Does anyone know a workaround? >>>> >>>> On Mon, Mar 28, 2016 at 7:28 PM, Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> See this method: >>>>> >>>>> lazy val rdd: RDD[T] = { >>>>> >>>>> On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney < >>>>> russell.jur...@gmail.com> wrote: >>>>> >>>>>> Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD. >>>>>> This seems related to DataFrames. Is there a way to convert a DataFrame's >>>>>> RDD to a 'normal' RDD? >>>>>> >>>>>> >>>>>> On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney < >>>>>> russell.jur...@gmail.com> wrote: >>>>>> >>>>>>> I filed a JIRA <https://jira.mongodb.org/browse/HADOOP-276> in the >>>>>>> mongo-hadoop project, but I'm curious if anyone else has seen this >>>>>>> issue. >>>>>>> Anyone have any idea what to do? I can't save to Mongo from PySpark. A >>>>>>> contrived example works, but a dataframe does not. >>>>>>> >>>>>>> I activate pymongo_spark and load a dataframe: >>>>>>> >>>>>>> import pymongo >>>>>>> import pymongo_spark >>>>>>> # Important: activate pymongo_spark. >>>>>>> pymongo_spark.activate() >>>>>>> >>>>>>> on_time_dataframe = >>>>>>> sqlContext.read.parquet('../data/on_time_performance.parquet') >>>>>>> >>>>>>> Then I try saving to MongoDB in two ways: >>>>>>> >>>>>>> >>>>>>> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>>>> >>>>>>> on_time_dataframe.rdd.saveAsNewAPIHadoopFile( >>>>>>> path='file://unused', >>>>>>> outputFormatClass='com.mongodb.hadoop.MongoOutputFormat', >>>>>>> keyClass='org.apache.hadoop.io.Text', >>>>>>> valueClass='org.apache.hadoop.io.MapWritable', >>>>>>> conf={"mongo.output.uri": >>>>>>> "mongodb://localhost:27017/agile_data_science.on_time_performance"} >>>>>>> ) >>>>>>> >>>>>>> >>>>>>> But I always get this error: >>>>>>> >>>>>>> In [7]: >>>>>>> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to >>>>>>> process : 1 >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at >>>>>>> PythonRDD.scala:393 >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at >>>>>>> PythonRDD.scala:393) with 1 output partitions >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: >>>>>>> ResultStage 2 (runJob at PythonRDD.scala:393) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final >>>>>>> stage: List() >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: >>>>>>> List() >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting >>>>>>> ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no >>>>>>> missing parents >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored >>>>>>> as values in memory (estimated size 19.3 KB, free 249.2 KB) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0 >>>>>>> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added >>>>>>> broadcast_5_piece0 in memory on localhost:59881 (size: 9.7 KB, free: >>>>>>> 511.1 >>>>>>> MB) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from >>>>>>> broadcast at DAGScheduler.scala:1006 >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing >>>>>>> tasks from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set >>>>>>> 2.0 with 1 tasks >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 >>>>>>> in stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage >>>>>>> 2.0 (TID 2) >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split: >>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >>>>>>> >>>>>>> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new >>>>>>> decompressor [.gz] >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, >>>>>>> boot = 1249, init = 58, finish = 3 >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage >>>>>>> 2.0 (TID 2). 4475 bytes result sent to driver >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 >>>>>>> in stage 2.0 (TID 2) in 1345 ms on localhost (1/1) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet >>>>>>> 2.0, whose tasks have all completed, from pool >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob >>>>>>> at PythonRDD.scala:393) finished in 1.346 s >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: >>>>>>> runJob at PythonRDD.scala:393, took 1.361003 s >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at >>>>>>> SerDeUtil.scala:231 >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at >>>>>>> SerDeUtil.scala:231) with 1 output partitions >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: >>>>>>> ResultStage 3 (take at SerDeUtil.scala:231) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final >>>>>>> stage: List() >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: >>>>>>> List() >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting >>>>>>> ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at >>>>>>> SerDeUtil.scala:146), which has no missing parents >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored >>>>>>> as values in memory (estimated size 19.6 KB, free 278.4 KB) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0 >>>>>>> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added >>>>>>> broadcast_6_piece0 in memory on localhost:59881 (size: 9.8 KB, free: >>>>>>> 511.1 >>>>>>> MB) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from >>>>>>> broadcast at DAGScheduler.scala:1006 >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing >>>>>>> tasks from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at >>>>>>> SerDeUtil.scala:146) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set >>>>>>> 3.0 with 1 tasks >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Starting task 0.0 >>>>>>> in stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO executor.Executor: Running task 0.0 in stage >>>>>>> 3.0 (TID 3) >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO rdd.HadoopRDD: Input split: >>>>>>> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO compress.CodecPool: Got brand-new >>>>>>> decompressor [.gz] >>>>>>> >>>>>>> 16/03/28 18:04:07 ERROR executor.Executor: Exception in task 0.0 in >>>>>>> stage 3.0 (TID 3) >>>>>>> >>>>>>> net.razorvine.pickle.PickleException: expected zero arguments for >>>>>>> construction of ClassDict (for pyspark.sql.types._create_row) >>>>>>> >>>>>>> at >>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>>>> >>>>>>> at scala.collection.TraversableOnce$class.to >>>>>>> (TraversableOnce.scala:273) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>>> >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> 16/03/28 18:04:07 WARN scheduler.TaskSetManager: Lost task 0.0 in >>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: >>>>>>> expected zero arguments for construction of ClassDict (for >>>>>>> pyspark.sql.types._create_row) >>>>>>> >>>>>>> at >>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>>>> >>>>>>> at scala.collection.TraversableOnce$class.to >>>>>>> (TraversableOnce.scala:273) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>>> >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> >>>>>>> 16/03/28 18:04:07 ERROR scheduler.TaskSetManager: Task 0 in stage >>>>>>> 3.0 failed 1 times; aborting job >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet >>>>>>> 3.0, whose tasks have all completed, from pool >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Cancelling stage >>>>>>> 3 >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 3 (take >>>>>>> at SerDeUtil.scala:231) failed in 0.117 s >>>>>>> >>>>>>> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 3 failed: take at >>>>>>> SerDeUtil.scala:231, took 0.134593 s >>>>>>> >>>>>>> >>>>>>> --------------------------------------------------------------------------- >>>>>>> >>>>>>> Py4JJavaError Traceback (most recent >>>>>>> call last) >>>>>>> >>>>>>> <ipython-input-7-d1f984f17e27> in <module>() >>>>>>> >>>>>>> ----> 1 on_time_rdd.saveToMongoDB >>>>>>> ('mongodb://localhost:27017/agile_data_science.on_time_performance') >>>>>>> >>>>>>> >>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/lib/pymongo_spark.pyc in >>>>>>> saveToMongoDB(self, connection_string, config) >>>>>>> >>>>>>> 104 keyConverter >>>>>>> ='com.mongodb.spark.pickle.NoopConverter', >>>>>>> >>>>>>> 105 valueConverter >>>>>>> ='com.mongodb.spark.pickle.NoopConverter', >>>>>>> >>>>>>> --> 106 conf=conf) >>>>>>> >>>>>>> 107 >>>>>>> >>>>>>> 108 >>>>>>> >>>>>>> >>>>>>> >>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.pyc >>>>>>> in saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass, >>>>>>> valueClass, keyConverter, valueConverter, conf) >>>>>>> >>>>>>> 1372 >>>>>>> outputFormatClass, >>>>>>> >>>>>>> 1373 >>>>>>> keyClass, valueClass, >>>>>>> >>>>>>> -> 1374 >>>>>>> keyConverter, valueConverter, jconf) >>>>>>> >>>>>>> 1375 >>>>>>> >>>>>>> 1376 def saveAsHadoopDataset(self, conf, keyConverter=None, >>>>>>> valueConverter=None): >>>>>>> >>>>>>> >>>>>>> >>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >>>>>>> in __call__(self, *args) >>>>>>> >>>>>>> 811 answer = self.gateway_client.send_command(command) >>>>>>> >>>>>>> 812 return_value = get_return_value( >>>>>>> >>>>>>> --> 813 answer, self.gateway_client, self.target_id, >>>>>>> self.name) >>>>>>> >>>>>>> 814 >>>>>>> >>>>>>> 815 for temp_arg in temp_args: >>>>>>> >>>>>>> >>>>>>> >>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc >>>>>>> in deco(*a, **kw) >>>>>>> >>>>>>> 43 def deco(*a, **kw): >>>>>>> >>>>>>> 44 try: >>>>>>> >>>>>>> ---> 45 return f(*a, **kw) >>>>>>> >>>>>>> 46 except py4j.protocol.Py4JJavaError as e: >>>>>>> >>>>>>> 47 s = e.java_exception.toString() >>>>>>> >>>>>>> >>>>>>> >>>>>>> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py >>>>>>> in get_return_value(answer, gateway_client, target_id, name) >>>>>>> >>>>>>> 306 raise Py4JJavaError( >>>>>>> >>>>>>> 307 "An error occurred while calling >>>>>>> {0}{1}{2}.\n". >>>>>>> >>>>>>> --> 308 format(target_id, ".", name), value) >>>>>>> >>>>>>> 309 else: >>>>>>> >>>>>>> 310 raise Py4JError( >>>>>>> >>>>>>> >>>>>>> Py4JJavaError: An error occurred while calling >>>>>>> z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. >>>>>>> >>>>>>> : org.apache.spark.SparkException: Job aborted due to stage failure: >>>>>>> Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 >>>>>>> in >>>>>>> stage 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: >>>>>>> expected zero arguments for construction of ClassDict (for >>>>>>> pyspark.sql.types._create_row) >>>>>>> >>>>>>> at >>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>>>> >>>>>>> at scala.collection.TraversableOnce$class.to >>>>>>> (TraversableOnce.scala:273) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>>> >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> >>>>>>> Driver stacktrace: >>>>>>> >>>>>>> at org.apache.spark.scheduler.DAGScheduler.org >>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>> >>>>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >>>>>>> >>>>>>> at scala.Option.foreach(Option.scala:236) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >>>>>>> >>>>>>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >>>>>>> >>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) >>>>>>> >>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) >>>>>>> >>>>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) >>>>>>> >>>>>>> at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>>>>> >>>>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>>>>> >>>>>>> at org.apache.spark.rdd.RDD.take(RDD.scala:1302) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:231) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:775) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) >>>>>>> >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> >>>>>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>>>>> >>>>>>> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) >>>>>>> >>>>>>> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) >>>>>>> >>>>>>> at py4j.Gateway.invoke(Gateway.java:259) >>>>>>> >>>>>>> at >>>>>>> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >>>>>>> >>>>>>> at py4j.commands.CallCommand.execute(CallCommand.java:79) >>>>>>> >>>>>>> at py4j.GatewayConnection.run(GatewayConnection.java:209) >>>>>>> >>>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>> Caused by: net.razorvine.pickle.PickleException: expected zero >>>>>>> arguments for construction of ClassDict (for >>>>>>> pyspark.sql.types._create_row) >>>>>>> >>>>>>> at >>>>>>> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >>>>>>> >>>>>>> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >>>>>>> >>>>>>> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >>>>>>> >>>>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>>>>> >>>>>>> at >>>>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>>>>> >>>>>>> at scala.collection.TraversableOnce$class.to >>>>>>> (TraversableOnce.scala:273) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>>>>> >>>>>>> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >>>>>>> >>>>>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >>>>>>> >>>>>>> at org.apache.spark.scheduler.Task.run(Task.scala:89) >>>>>>> >>>>>>> at >>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>>>>> >>>>>>> at >>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>>>>> >>>>>>> ... 1 more >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com >>>>>>> relato.io >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >>>> >>> >>> >>> >>> -- >>> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >>> >> >> >> >> -- >> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >> > > > > -- > Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io > -- Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io