Hi,

I am writing a Spark program in Python to find the maximum temperature for
a year, given a weather dataset. The below program throws an error when I
try to execute the Spark program.

TypeError: 'NoneType' object is not iterable


org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)

org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
        org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
        org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

If I replace +9999 with 9999 in the extractData function, the program
executes without any error. The code with +9999 works in the Hadoop
streaming, but not with Spark pipes. How to get around the problem? Has to
do it with the way encoding is done within Spark?

-------------

import re
import sys

from pyspark import SparkContext

#Create Spark Context with the master details and the application name
sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")

#define the accumulator
invalidRecordsAccumulator = sc.accumulator(0)
validRecordsAccumulator = sc.accumulator(0)

logFile = "hdfs://localhost:9000/user/bigdatavm/input"

#Create an RDD from the input data in HDFS
weatherData = sc.textFile(logFile)

#function to extract the data from the line
#based on position and filter out the invalid records









*def extractData(line):    global invalidRecordsAccumulator    val =
line.strip()    (year, temp, q) = (val[15:19], val[87:92], val[92:93])
if (temp != "+9999" and re.match("[01459]", q)):
validRecordsAccumulator += 1    return (year, temp)    else:
invalidRecordsAccumulator += 1*


#Transform the data to extract/filter and then find the max temperature
max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda
a,b : a if int(a) > int(b) else b)

#Save the RDD back into HDFS
max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")

print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value

Thanks,
Praveen

Reply via email to