Hi Sean, Thanks for the response. I changed from map to flatMap and in the function return a list as below
if (temp != "+9999" and re.match("[01459]", q)): return [(year,temp)] else: return [] Thanks, Praveen On Mon, Sep 22, 2014 at 9:26 PM, Sean Owen <so...@cloudera.com> wrote: > If your map() sometimes does not emit an element, then you need to > call flatMap() instead, and emit Some(value) (or any collection of > values) if there is an element to return, or None otherwise. > > On Mon, Sep 22, 2014 at 4:50 PM, Praveen Sripati > <praveensrip...@gmail.com> wrote: > > During the map based on some conditions if some of the rows are ignored > > (without any transformation) then then there is a record by None in the > > output RDD for the ignored records. And reduceByKey is not able to handle > > this type of None record and so the exception. I tried filter, but it is > > also not able to handle the None record as input. > > > > How to get around this? > > > > Thanks, > > Praveen > > > > On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati < > praveensrip...@gmail.com> > > wrote: > >> > >> Hi, > >> > >> I am writing a Spark program in Python to find the maximum temperature > for > >> a year, given a weather dataset. The below program throws an error when > I > >> try to execute the Spark program. > >> > >> TypeError: 'NoneType' object is not iterable > >> > >> > >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) > >> > >> > org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) > >> > org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) > >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > >> > >> If I replace +9999 with 9999 in the extractData function, the program > >> executes without any error. The code with +9999 works in the Hadoop > >> streaming, but not with Spark pipes. How to get around the problem? Has > to > >> do it with the way encoding is done within Spark? > >> > >> ------------- > >> > >> import re > >> import sys > >> > >> from pyspark import SparkContext > >> > >> #Create Spark Context with the master details and the application name > >> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature") > >> > >> #define the accumulator > >> invalidRecordsAccumulator = sc.accumulator(0) > >> validRecordsAccumulator = sc.accumulator(0) > >> > >> logFile = "hdfs://localhost:9000/user/bigdatavm/input" > >> > >> #Create an RDD from the input data in HDFS > >> weatherData = sc.textFile(logFile) > >> > >> #function to extract the data from the line > >> #based on position and filter out the invalid records > >> def extractData(line): > >> > >> global invalidRecordsAccumulator > >> val = line.strip() > >> (year, temp, q) = (val[15:19], val[87:92], val[92:93]) > >> if (temp != "+9999" and re.match("[01459]", q)): > >> validRecordsAccumulator += 1 > >> return (year, temp) > >> else: > >> invalidRecordsAccumulator += 1 > >> > >> > >> #Transform the data to extract/filter and then find the max temperature > >> max_temperature_per_year = > weatherData.map(extractData).reduceByKey(lambda > >> a,b : a if int(a) > int(b) else b) > >> > >> #Save the RDD back into HDFS > >> > >> > max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output") > >> > >> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value > >> print "----->>>>>> Invalid records = %d" % > invalidRecordsAccumulator.value > >> > >> Thanks, > >> Praveen > > > > >