During the map based on some conditions if some of the rows are ignored
(without any transformation) then then there is a record by None in the
output RDD for the ignored records. And reduceByKey is not able to handle
this type of None record and so the exception. I tried filter, but it is
also not able to handle the None record as input.

How to get around this?

Thanks,
Praveen

On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <praveensrip...@gmail.com>
wrote:

> Hi,
>
> I am writing a Spark program in Python to find the maximum temperature for
> a year, given a weather dataset. The below program throws an error when I
> try to execute the Spark program.
>
> TypeError: 'NoneType' object is not iterable
>
>
> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
>         org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>
> If I replace +9999 with 9999 in the extractData function, the program
> executes without any error. The code with +9999 works in the Hadoop
> streaming, but not with Spark pipes. How to get around the problem? Has to
> do it with the way encoding is done within Spark?
>
> -------------
>
> import re
> import sys
>
> from pyspark import SparkContext
>
> #Create Spark Context with the master details and the application name
> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
>
> #define the accumulator
> invalidRecordsAccumulator = sc.accumulator(0)
> validRecordsAccumulator = sc.accumulator(0)
>
> logFile = "hdfs://localhost:9000/user/bigdatavm/input"
>
> #Create an RDD from the input data in HDFS
> weatherData = sc.textFile(logFile)
>
> #function to extract the data from the line
> #based on position and filter out the invalid records
>
>
>
>
>
>
>
>
>
> *def extractData(line):    global invalidRecordsAccumulator    val =
> line.strip()    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
> if (temp != "+9999" and re.match("[01459]", q)):
> validRecordsAccumulator += 1    return (year, temp)    else:
> invalidRecordsAccumulator += 1*
>
>
> #Transform the data to extract/filter and then find the max temperature
> max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
> a,b : a if int(a) > int(b) else b)
>
> #Save the RDD back into HDFS
>
> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
>
> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
> print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value
>
> Thanks,
> Praveen
>

Reply via email to