I have a file in s3 that I want to map each line with an index. Here is my code:
>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache() >>> N input_data.count() >>> index = sc.parallelize(range(N), 6) >>> index.zip(input_data).collect() ... 14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4) 14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at <stdin>:1) finished in 0.031 s 14/07/28 19:49:31 INFO SparkContext: Job finished: collect at <stdin>:1, took 0.039999707 s Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/root/spark/python/pyspark/rdd.py", line 584, in collect return list(self._collect_iterator_through_file(bytesInJava)) File "/root/spark/python/pyspark/rdd.py", line 592, in _collect_iterator_through_file self.ctx._writeToFile(iterator, tempFile.name) File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py", line 537, in __call__ File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.writeToFile. : java.lang.ClassCastException: java.lang.String cannot be cast to [B at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312) at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342) at org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337) at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala) at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) As I see it, the job is completed, but I don't understand what's happening to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD and it works fine. But here I have a MappedRDD at textFile. Not sure what's going on here. Also, why Python does not have ZipWithIndex()? Thanks for any help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html Sent from the Apache Spark User List mailing list archive at Nabble.com.