If your map() sometimes does not emit an element, then you need to call flatMap() instead, and emit Some(value) (or any collection of values) if there is an element to return, or None otherwise.
On Mon, Sep 22, 2014 at 4:50 PM, Praveen Sripati <praveensrip...@gmail.com> wrote: > During the map based on some conditions if some of the rows are ignored > (without any transformation) then then there is a record by None in the > output RDD for the ignored records. And reduceByKey is not able to handle > this type of None record and so the exception. I tried filter, but it is > also not able to handle the None record as input. > > How to get around this? > > Thanks, > Praveen > > On Mon, Sep 22, 2014 at 6:09 PM, Praveen Sripati <praveensrip...@gmail.com> > wrote: >> >> Hi, >> >> I am writing a Spark program in Python to find the maximum temperature for >> a year, given a weather dataset. The below program throws an error when I >> try to execute the Spark program. >> >> TypeError: 'NoneType' object is not iterable >> >> >> org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) >> >> org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:154) >> org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >> If I replace +9999 with 9999 in the extractData function, the program >> executes without any error. The code with +9999 works in the Hadoop >> streaming, but not with Spark pipes. How to get around the problem? Has to >> do it with the way encoding is done within Spark? >> >> ------------- >> >> import re >> import sys >> >> from pyspark import SparkContext >> >> #Create Spark Context with the master details and the application name >> sc = SparkContext("spark://bigdata-vm:7077", "max_temperature") >> >> #define the accumulator >> invalidRecordsAccumulator = sc.accumulator(0) >> validRecordsAccumulator = sc.accumulator(0) >> >> logFile = "hdfs://localhost:9000/user/bigdatavm/input" >> >> #Create an RDD from the input data in HDFS >> weatherData = sc.textFile(logFile) >> >> #function to extract the data from the line >> #based on position and filter out the invalid records >> def extractData(line): >> >> global invalidRecordsAccumulator >> val = line.strip() >> (year, temp, q) = (val[15:19], val[87:92], val[92:93]) >> if (temp != "+9999" and re.match("[01459]", q)): >> validRecordsAccumulator += 1 >> return (year, temp) >> else: >> invalidRecordsAccumulator += 1 >> >> >> #Transform the data to extract/filter and then find the max temperature >> max_temperature_per_year = weatherData.map(extractData).reduceByKey(lambda >> a,b : a if int(a) > int(b) else b) >> >> #Save the RDD back into HDFS >> >> max_temperature_per_year.saveAsTextFile("hdfs://localhost:9000/user/bigdatavm/output") >> >> print "----->>>>>> Valid records = %d" % validRecordsAccumulator.value >> print "----->>>>>> Invalid records = %d" % invalidRecordsAccumulator.value >> >> Thanks, >> Praveen > > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org