During the map based on some conditions if some of the rows are ignored (without any transformation) then then there is a record by None in the output RDD for the ignored records. And reduceByKey is not able to handle this type of None record and so the exception. I tried filter, but it is also not able to handle the None record as input.
How to get around this? Thanks, Praveen On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <praveensrip...@gmail.com> wrote: > Hi, > > I am writing a Spark program in Python to find the maximum temperature for > a year, given a weather dataset. The below program throws an error when I > try to execute the Spark program. > > TypeError: 'NoneType' object is not iterable > > > org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) > > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > > If I replace +9999 with 9999 in the extractData function, the program > executes without any error. The code with +9999 works in the Hadoop > streaming, but not with Spark pipes. How to get around the problem? Has to > do it with the way encoding is done within Spark? > > ------------- > > import re > import sys > > from pyspark import SparkContext > > #Create Spark Context with the master details and the application name > sc = SparkContext("spark://bigdata-vm:7077", "max_temperature") > > #define the accumulator > invalidRecordsAccumulator = sc.accumulator(0) > validRecordsAccumulator = sc.accumulator(0) > > logFile = "hdfs://localhost:9000/user/bigdatavm/input" > > #Create an RDD from the input data in HDFS > weatherData = sc.textFile(logFile) > > #function to extract the data from the line > #based on position and filter out the invalid records > > > > > > > > > > *def extractData(line): global invalidRecordsAccumulator val = > line.strip() (year, temp, q) = (val[15:19], val[87:92], val[92:93]) > if (temp != "+9999" and re.match("[01459]", q)): > validRecordsAccumulator += 1 return (year, temp) else: > invalidRecordsAccumulator += 1* > > > #Transform the data to extract/filter and then find the max temperature > max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda > a,b : a if int(a) > int(b) else b) > > #Save the RDD back into HDFS > > max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output") > > print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value > print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value > > Thanks, > Praveen >