Hi Sean,

Thanks for the response. I changed from map to flatMap and in the function
return a list as below

if (temp != "+9999" and re.match("[01459]", q)):
    return [(year,temp)]
else:
    return []

Thanks,
Praveen

On Mon, Sep 22, 2014 at 9:26 PM, Sean Owen <so...@cloudera.com> wrote:

> If your map() sometimes does not emit an element, then you need to
> call flatMap() instead, and emit Some(value) (or any collection of
> values) if there is an element to return, or None otherwise.
>
> On Mon, Sep 22, 2014 at 4:50 PM, Praveen Sripati
> <praveensrip...@gmail.com> wrote:
> > During the map based on some conditions if some of the rows are ignored
> > (without any transformation) then then there is a record by None in the
> > output RDD for the ignored records. And reduceByKey is not able to handle
> > this type of None record and so the exception. I tried filter, but it is
> > also not able to handle the None record as input.
> >
> > How to get around this?
> >
> > Thanks,
> > Praveen
> >
> > On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <
> praveensrip...@gmail.com>
> > wrote:
> >>
> >> Hi,
> >>
> >> I am writing a Spark program in Python to find the maximum temperature
> for
> >> a year, given a weather dataset. The below program throws an error when
> I
> >> try to execute the Spark program.
> >>
> >> TypeError: 'NoneType' object is not iterable
> >>
> >>
> >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
> >>
> >>
> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154)
> >>
>  org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
> >>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >>
> >> If I replace +9999 with 9999 in the extractData function, the program
> >> executes without any error. The code with +9999 works in the Hadoop
> >> streaming, but not with Spark pipes. How to get around the problem? Has
> to
> >> do it with the way encoding is done within Spark?
> >>
> >> -------------
> >>
> >> import re
> >> import sys
> >>
> >> from pyspark import SparkContext
> >>
> >> #Create Spark Context with the master details and the application name
> >> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature")
> >>
> >> #define the accumulator
> >> invalidRecordsAccumulator = sc.accumulator(0)
> >> validRecordsAccumulator = sc.accumulator(0)
> >>
> >> logFile = "hdfs://localhost:9000/user/bigdatavm/input"
> >>
> >> #Create an RDD from the input data in HDFS
> >> weatherData = sc.textFile(logFile)
> >>
> >> #function to extract the data from the line
> >> #based on position and filter out the invalid records
> >> def extractData(line):
> >>
> >>     global invalidRecordsAccumulator
> >>     val = line.strip()
> >>     (year, temp, q) = (val[15:19], val[87:92], val[92:93])
> >>     if (temp != "+9999" and re.match("[01459]", q)):
> >>         validRecordsAccumulator += 1
> >>     return (year, temp)
> >>     else:
> >>        invalidRecordsAccumulator += 1
> >>
> >>
> >> #Transform the data to extract/filter and then find the max temperature
> >> max_temperature_per_year =
> weatherData.map(extractData).reduceByKey(lambda
> >> a,b : a if int(a) > int(b) else b)
> >>
> >> #Save the RDD back into HDFS
> >>
> >>
> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output")
> >>
> >> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value
> >> print "----->>>>>> Invalid records = %d" %
> invalidRecordsAccumulator.value
> >>
> >> Thanks,
> >> Praveen
> >
> >
>

Reply via email to