See this method: lazy val rdd: RDD[T] = {
On Mon, Mar 28, 2016 at 6:30 PM, Russell Jurney <russell.jur...@gmail.com> wrote: > Ok, I'm also unable to save to Elasticsearch using a dataframe's RDD. This > seems related to DataFrames. Is there a way to convert a DataFrame's RDD to > a 'normal' RDD? > > > On Mon, Mar 28, 2016 at 6:20 PM, Russell Jurney <russell.jur...@gmail.com> > wrote: > >> I filed a JIRA <https://jira.mongodb.org/browse/HADOOP-276> in the >> mongo-hadoop project, but I'm curious if anyone else has seen this issue. >> Anyone have any idea what to do? I can't save to Mongo from PySpark. A >> contrived example works, but a dataframe does not. >> >> I activate pymongo_spark and load a dataframe: >> >> import pymongo >> import pymongo_spark >> # Important: activate pymongo_spark. >> pymongo_spark.activate() >> >> on_time_dataframe = >> sqlContext.read.parquet('../data/on_time_performance.parquet') >> >> Then I try saving to MongoDB in two ways: >> >> >> on_time_dataframe.rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >> >> on_time_dataframe.rdd.saveAsNewAPIHadoopFile( >> path='file://unused', >> outputFormatClass='com.mongodb.hadoop.MongoOutputFormat', >> keyClass='org.apache.hadoop.io.Text', >> valueClass='org.apache.hadoop.io.MapWritable', >> conf={"mongo.output.uri": >> "mongodb://localhost:27017/agile_data_science.on_time_performance"} >> ) >> >> >> But I always get this error: >> >> In [7]: >> on_time_rdd.saveToMongoDB('mongodb://localhost:27017/agile_data_science.on_time_performance') >> >> 16/03/28 18:04:06 INFO mapred.FileInputFormat: Total input paths to >> process : 1 >> >> 16/03/28 18:04:06 INFO spark.SparkContext: Starting job: runJob at >> PythonRDD.scala:393 >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Got job 2 (runJob at >> PythonRDD.scala:393) with 1 output partitions >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Final stage: ResultStage 2 >> (runJob at PythonRDD.scala:393) >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Parents of final stage: >> List() >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Missing parents: List() >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting ResultStage 2 >> (PythonRDD[13] at RDD at PythonRDD.scala:43), which has no missing parents >> >> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5 stored as >> values in memory (estimated size 19.3 KB, free 249.2 KB) >> >> 16/03/28 18:04:06 INFO storage.MemoryStore: Block broadcast_5_piece0 >> stored as bytes in memory (estimated size 9.7 KB, free 258.9 KB) >> >> 16/03/28 18:04:06 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 >> in memory on localhost:59881 (size: 9.7 KB, free: 511.1 MB) >> >> 16/03/28 18:04:06 INFO spark.SparkContext: Created broadcast 5 from >> broadcast at DAGScheduler.scala:1006 >> >> 16/03/28 18:04:06 INFO scheduler.DAGScheduler: Submitting 1 missing tasks >> from ResultStage 2 (PythonRDD[13] at RDD at PythonRDD.scala:43) >> >> 16/03/28 18:04:06 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 >> with 1 tasks >> >> 16/03/28 18:04:06 INFO scheduler.TaskSetManager: Starting task 0.0 in >> stage 2.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >> >> 16/03/28 18:04:06 INFO executor.Executor: Running task 0.0 in stage 2.0 >> (TID 2) >> >> 16/03/28 18:04:06 INFO rdd.HadoopRDD: Input split: >> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >> >> 16/03/28 18:04:06 INFO compress.CodecPool: Got brand-new decompressor >> [.gz] >> >> 16/03/28 18:04:07 INFO python.PythonRunner: Times: total = 1310, boot = >> 1249, init = 58, finish = 3 >> >> 16/03/28 18:04:07 INFO executor.Executor: Finished task 0.0 in stage 2.0 >> (TID 2). 4475 bytes result sent to driver >> >> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Finished task 0.0 in >> stage 2.0 (TID 2) in 1345 ms on localhost (1/1) >> >> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, >> whose tasks have all completed, from pool >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 2 (runJob at >> PythonRDD.scala:393) finished in 1.346 s >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 2 finished: runJob at >> PythonRDD.scala:393, took 1.361003 s >> >> 16/03/28 18:04:07 INFO spark.SparkContext: Starting job: take at >> SerDeUtil.scala:231 >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Got job 3 (take at >> SerDeUtil.scala:231) with 1 output partitions >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Final stage: ResultStage 3 >> (take at SerDeUtil.scala:231) >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Parents of final stage: >> List() >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Missing parents: List() >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting ResultStage 3 >> (MapPartitionsRDD[15] at mapPartitions at SerDeUtil.scala:146), which has >> no missing parents >> >> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6 stored as >> values in memory (estimated size 19.6 KB, free 278.4 KB) >> >> 16/03/28 18:04:07 INFO storage.MemoryStore: Block broadcast_6_piece0 >> stored as bytes in memory (estimated size 9.8 KB, free 288.2 KB) >> >> 16/03/28 18:04:07 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 >> in memory on localhost:59881 (size: 9.8 KB, free: 511.1 MB) >> >> 16/03/28 18:04:07 INFO spark.SparkContext: Created broadcast 6 from >> broadcast at DAGScheduler.scala:1006 >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Submitting 1 missing tasks >> from ResultStage 3 (MapPartitionsRDD[15] at mapPartitions at >> SerDeUtil.scala:146) >> >> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 >> with 1 tasks >> >> 16/03/28 18:04:07 INFO scheduler.TaskSetManager: Starting task 0.0 in >> stage 3.0 (TID 3, localhost, partition 0,PROCESS_LOCAL, 2666 bytes) >> >> 16/03/28 18:04:07 INFO executor.Executor: Running task 0.0 in stage 3.0 >> (TID 3) >> >> 16/03/28 18:04:07 INFO rdd.HadoopRDD: Input split: >> file:/Users/rjurney/Software/Agile_Data_Code_2/data/On_Time_On_Time_Performance_2015.csv.gz:0+312456777 >> >> 16/03/28 18:04:07 INFO compress.CodecPool: Got brand-new decompressor >> [.gz] >> >> 16/03/28 18:04:07 ERROR executor.Executor: Exception in task 0.0 in stage >> 3.0 (TID 3) >> >> net.razorvine.pickle.PickleException: expected zero arguments for >> construction of ClassDict (for pyspark.sql.types._create_row) >> >> at >> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >> >> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >> >> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >> >> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >> >> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> >> at java.lang.Thread.run(Thread.java:745) >> >> 16/03/28 18:04:07 WARN scheduler.TaskSetManager: Lost task 0.0 in stage >> 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: expected zero >> arguments for construction of ClassDict (for pyspark.sql.types._create_row) >> >> at >> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >> >> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >> >> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >> >> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >> >> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> 16/03/28 18:04:07 ERROR scheduler.TaskSetManager: Task 0 in stage 3.0 >> failed 1 times; aborting job >> >> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, >> whose tasks have all completed, from pool >> >> 16/03/28 18:04:07 INFO scheduler.TaskSchedulerImpl: Cancelling stage 3 >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: ResultStage 3 (take at >> SerDeUtil.scala:231) failed in 0.117 s >> >> 16/03/28 18:04:07 INFO scheduler.DAGScheduler: Job 3 failed: take at >> SerDeUtil.scala:231, took 0.134593 s >> >> >> --------------------------------------------------------------------------- >> >> Py4JJavaError Traceback (most recent call >> last) >> >> <ipython-input-7-d1f984f17e27> in <module>() >> >> ----> 1 on_time_rdd.saveToMongoDB >> ('mongodb://localhost:27017/agile_data_science.on_time_performance') >> >> >> /Users/rjurney/Software/Agile_Data_Code_2/lib/pymongo_spark.pyc in >> saveToMongoDB(self, connection_string, config) >> >> 104 keyConverter='com.mongodb.spark.pickle.NoopConverter', >> >> 105 valueConverter='com.mongodb.spark.pickle.NoopConverter', >> >> --> 106 conf=conf) >> >> 107 >> >> 108 >> >> >> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.pyc >> in saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass, >> valueClass, keyConverter, valueConverter, conf) >> >> 1372 >> outputFormatClass, >> >> 1373 keyClass, >> valueClass, >> >> -> 1374 >> keyConverter, valueConverter, jconf) >> >> 1375 >> >> 1376 def saveAsHadoopDataset(self, conf, keyConverter=None, >> valueConverter=None): >> >> >> >> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py >> in __call__(self, *args) >> >> 811 answer = self.gateway_client.send_command(command) >> >> 812 return_value = get_return_value( >> >> --> 813 answer, self.gateway_client, self.target_id, >> self.name) >> >> 814 >> >> 815 for temp_arg in temp_args: >> >> >> >> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.pyc >> in deco(*a, **kw) >> >> 43 def deco(*a, **kw): >> >> 44 try: >> >> ---> 45 return f(*a, **kw) >> >> 46 except py4j.protocol.Py4JJavaError as e: >> >> 47 s = e.java_exception.toString() >> >> >> >> /Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py >> in get_return_value(answer, gateway_client, target_id, name) >> >> 306 raise Py4JJavaError( >> >> 307 "An error occurred while calling >> {0}{1}{2}.\n". >> >> --> 308 format(target_id, ".", name), value) >> >> 309 else: >> >> 310 raise Py4JError( >> >> >> Py4JJavaError: An error occurred while calling >> z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. >> >> : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage >> 3.0 (TID 3, localhost): net.razorvine.pickle.PickleException: expected zero >> arguments for construction of ClassDict (for pyspark.sql.types._create_row) >> >> at >> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >> >> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >> >> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >> >> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >> >> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> >> at java.lang.Thread.run(Thread.java:745) >> >> >> Driver stacktrace: >> >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >> >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >> >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >> >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >> >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) >> >> at scala.Option.foreach(Option.scala:236) >> >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) >> >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) >> >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >> >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >> >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) >> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) >> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) >> >> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) >> >> at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1328) >> >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >> >> at >> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >> >> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >> >> at org.apache.spark.rdd.RDD.take(RDD.scala:1302) >> >> at >> org.apache.spark.api.python.SerDeUtil$.pythonToPairRDD(SerDeUtil.scala:231) >> >> at >> org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:775) >> >> at >> org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala) >> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:497) >> >> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) >> >> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) >> >> at py4j.Gateway.invoke(Gateway.java:259) >> >> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) >> >> at py4j.commands.CallCommand.execute(CallCommand.java:79) >> >> at py4j.GatewayConnection.run(GatewayConnection.java:209) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: net.razorvine.pickle.PickleException: expected zero arguments >> for construction of ClassDict (for pyspark.sql.types._create_row) >> >> at >> net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23) >> >> at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707) >> >> at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175) >> >> at net.razorvine.pickle.Unpickler.load(Unpickler.java:99) >> >> at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:150) >> >> at >> org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:149) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) >> >> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >> >> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >> >> at >> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >> >> at >> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >> >> at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >> >> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >> >> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >> >> at >> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >> >> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$28.apply(RDD.scala:1328) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at >> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) >> >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:89) >> >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> >> ... 1 more >> >> >> -- >> Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >> > > > > -- > Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io >