BTW one other thing — in your experience, Diana, which non-text InputFormats 
would be most useful to support in Python first? Would it be Parquet or Avro, 
simple SequenceFiles with the Hadoop Writable types, or something else? I think 
a per-file text input format that does the stuff we did here would also be good.

Matei


On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote:

> Hi Diana,
> 
> This seems to work without the iter() in front if you just return 
> treeiterator. What happened when you didn’t include that? Treeiterator should 
> return an iterator.
> 
> Anyway, this is a good example of mapPartitions. It’s one where you want to 
> view the whole file as one object (one XML here), so you couldn’t implement 
> this using a flatMap, but you still want to return multiple values. The MLlib 
> example you saw needs Python 2.7 because unfortunately that is a requirement 
> for our Python MLlib support (see 
> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
>  We’d like to relax this later but we’re using some newer features of NumPy 
> and Python. The rest of PySpark works on 2.6.
> 
> In terms of the size in memory, here both the string s and the XML tree 
> constructed from it need to fit in, so you can’t work on very large 
> individual XML files. You may be able to use a streaming XML parser instead 
> to extract elements from the data in a streaming fashion, without every 
> materializing the whole tree. 
> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreader 
> is one example.
> 
> Matei
> 
> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
> 
>> Well, if anyone is still following this, I've gotten the following code 
>> working which in theory should allow me to parse whole XML files: (the 
>> problem was that I can't return the tree iterator directly.  I have to call 
>> iter().  Why?)
>> 
>> import xml.etree.ElementTree as ET
>> 
>> # two source files, format <data> <country name="...">...</country>...</data>
>> mydata=sc.textFile("file:/home/training/countries*.xml") 
>> 
>> def parsefile(iterator):
>>     s = ''
>>     for i in iterator: s = s + str(i)
>>     tree = ET.fromstring(s)
>>     treeiterator = tree.getiterator("country")
>>     # why to I have to convert an iterator to an iterator?  not sure but 
>> required
>>     return iter(treeiterator)
>> 
>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: 
>> element.attrib).collect()
>> 
>> The output is what I expect:
>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>> 
>> BUT I'm a bit concerned about the construction of the string "s".  How big 
>> can my file be before converting it to a string becomes problematic?
>> 
>> 
>> 
>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>> Thanks, Matei.
>> 
>> In the context of this discussion, it would seem mapParitions is essential, 
>> because it's the only way I'm going to be able to process each file as a 
>> whole, in our example of a large number of small XML files which need to be 
>> parsed as a whole file because records are not required to be on a single 
>> line.
>> 
>> The theory makes sense but I'm still utterly lost as to how to implement it. 
>>  Unfortunately there's only a single example of the use of mapPartitions in 
>> any of the Python example programs, which is the log regression example, 
>> which I can't run because it requires Python 2.7 and I'm on Python 2.6.  
>> (aside: I couldn't find any statement that Python 2.6 is unsupported...is 
>> it?)
>> 
>> I'd really really love to see a real life example of a Python use of 
>> mapPartitions.  I do appreciate the very simple examples you provided, but 
>> (perhaps because of my novice status on Python) I can't figure out how to 
>> translate those to a real world situation in which I'm building RDDs from 
>> files, not inline collections like [(1,2),(2,3)].
>> 
>> Also, you say that the function called in mapPartitions can return a 
>> collection OR an iterator.  I tried returning an iterator by calling 
>> ElementTree getiterator function, but still got the error telling me my 
>> object was not an iterator. 
>> 
>> If anyone has a real life example of mapPartitions returning a Python 
>> iterator, that would be fabulous.
>> 
>> Diana
>> 
>> 
>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>> wrote:
>> Oh, I see, the problem is that the function you pass to mapPartitions must 
>> itself return an iterator or a collection. This is used so that you can 
>> return multiple output records for each input record. You can implement most 
>> of the existing map-like operations in Spark, such as map, filter, flatMap, 
>> etc, with mapPartitions, as well as new ones that might do a sliding window 
>> over each partition for example, or accumulate data across elements (e.g. to 
>> compute a sum).
>> 
>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this will 
>> work:
>> 
>> >>> data.mapPartitions(lambda x: x).collect()
>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>> 
>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a 
>> single list (like glom)
>> 
>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>> [3, 7]   # Sum each partition separately
>> 
>> However something like data.mapPartitions(lambda x: sum(x)).collect() will 
>> *not* work because sum returns a number, not an iterator. That’s why I put 
>> sum(x) inside a list above.
>> 
>> In practice mapPartitions is most useful if you want to share some data or 
>> work across the elements. For example maybe you want to load a lookup table 
>> once from an external file and then check each element in it, or sum up a 
>> bunch of elements without allocating a lot of vector objects.
>> 
>> Matei
>> 
>> 
>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>> 
>> > "There’s also mapPartitions, which gives you an iterator for each 
>> > partition instead of an array. You can then return an iterator or list of 
>> > objects to produce from that."
>> >
>> > I confess, I was hoping for an example of just that, because i've not yet 
>> > been able to figure out how to use mapPartitions.  No doubt this is 
>> > because i'm a rank newcomer to Python, and haven't fully wrapped my head 
>> > around iterators.  All I get so far in my attempts to use mapPartitions is 
>> > the darned "suchnsuch is not an iterator" error.
>> >
>> > def myfunction(iterator): return [1,2,3]
>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>> >
>> >
>> >
>> >
>> >
>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>> > wrote:
>> > Here’s an example of getting together all lines in a file as one string:
>> >
>> > $ cat dir/a.txt
>> > Hello
>> > world!
>> >
>> > $ cat dir/b.txt
>> > What's
>> > up??
>> >
>> > $ bin/pyspark
>> > >>> files = sc.textFile(“dir”)
>> >
>> > >>> files.collect()
>> > [u'Hello', u'world!', u"What's", u'up??’]   # one element per line, not 
>> > what we want
>> >
>> > >>> files.glom().collect()
>> > [[u'Hello', u'world!'], [u"What's", u'up??’]]   # one element per file, 
>> > which is an array of lines
>> >
>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>> > [u'Hello\nworld!', u"What's\nup??”]    # join back each file into a single 
>> > string
>> >
>> > The glom() method groups all the elements of each partition of an RDD into 
>> > an array, giving you an RDD of arrays of objects. If your input is small 
>> > files, you always have one partition per file.
>> >
>> > There’s also mapPartitions, which gives you an iterator for each partition 
>> > instead of an array. You can then return an iterator or list of objects to 
>> > produce from that.
>> >
>> > Matei
>> >
>> >
>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>> >
>> > > Thanks Matei.  That makes sense.  I have here a dataset of many many 
>> > > smallish XML files, so using mapPartitions that way would make sense.  
>> > > I'd love to see a code example though ...It's not as obvious to me how 
>> > > to do that as I probably should be.
>> > >
>> > > Thanks,
>> > > Diana
>> > >
>> > >
>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <matei.zaha...@gmail.com> 
>> > > wrote:
>> > > Hi Diana,
>> > >
>> > > Non-text input formats are only supported in Java and Scala right now, 
>> > > where you can use sparkContext.hadoopFile or .hadoopDataset to load data 
>> > > with any InputFormat that Hadoop MapReduce supports. In Python, you 
>> > > unfortunately only have textFile, which gives you one record per line. 
>> > > For JSON, you’d have to fit the whole JSON object on one line as you 
>> > > said. Hopefully we’ll also have some other forms of input soon.
>> > >
>> > > If your input is a collection of separate files (say many .xml files), 
>> > > you can also use mapPartitions on it to group together the lines because 
>> > > each input file will end up being a single dataset partition (or map 
>> > > task). This will let you concatenate the lines in each file and parse 
>> > > them as one XML object.
>> > >
>> > > Matei
>> > >
>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>> > >
>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks like 
>> > >> you are assuming that each line in foo.log contains a complete json 
>> > >> object?  (That is, that the data doesn't contain any records that are 
>> > >> split into multiple lines.)  If so, is that because you know that to be 
>> > >> true of your data?  Or did you do as Nicholas suggests and have some 
>> > >> preprocessing on the text input to flatten the data in that way?
>> > >>
>> > >> Thanks,
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> 
>> > >> wrote:
>> > >> Katrina,
>> > >>
>> > >> Not sure if this is what you had in mind, but here's some simple 
>> > >> pyspark code that I recently wrote to deal with JSON files.
>> > >>
>> > >> from pyspark import SparkContext, SparkConf
>> > >>
>> > >>
>> > >>
>> > >> from operator import add
>> > >> import json
>> > >>
>> > >>
>> > >>
>> > >> import random
>> > >> import numpy as np
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> def concatenate_paragraphs(sentence_array):
>> > >>
>> > >>
>> > >>
>> > >> return ' '.join(sentence_array).split(' ')
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> logFile = 'foo.json'
>> > >> conf = SparkConf()
>> > >>
>> > >>
>> > >>
>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>> > >>  "1g")
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> sc = SparkContext(conf=conf)
>> > >>
>> > >>
>> > >>
>> > >> logData = sc.textFile(logFile).cache()
>> > >>
>> > >>
>> > >>
>> > >> num_lines = logData.count()
>> > >> print 'Number of lines: %d' % num_lines
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> # JSON object has the structure: {"key": {'paragraphs': [sentence1, 
>> > >> sentence2, ...]}}
>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], 
>> > >> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> op = tm.collect()
>> > >> for key, num_words in op:
>> > >>
>> > >>
>> > >>
>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark User 
>> > >> List] <[hidden email]> wrote:
>> > >> I don't actually have any data.  I'm writing a course that teaches 
>> > >> students how to do this sort of thing and am interested in looking at a 
>> > >> variety of real life examples of people doing things like that.  I'd 
>> > >> love to see some working code implementing the "obvious work-around" 
>> > >> you mention...do you have any to share?  It's an approach that makes a 
>> > >> lot of sense, and as I said, I'd love to not have to re-invent the 
>> > >> wheel if someone else has already written that code.  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden email]> 
>> > >> wrote:
>> > >> There was a previous discussion about this here:
>> > >>
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>> > >>
>> > >> How big are the XML or JSON files you're looking to deal with?
>> > >>
>> > >> It may not be practical to deserialize the entire document at once. In 
>> > >> that case an obvious work-around would be to have some kind of 
>> > >> pre-processing step that separates XML nodes/JSON objects with newlines 
>> > >> so that you can analyze the data with Spark in a "line-oriented 
>> > >> format". Your preprocessor wouldn't have to parse/deserialize the 
>> > >> massive document; it would just have to track open/closed tags/braces 
>> > >> to know when to insert a newline.
>> > >>
>> > >> Then you'd just open the line-delimited result and deserialize the 
>> > >> individual objects/nodes with map().
>> > >>
>> > >> Nick
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> wrote:
>> > >> Has anyone got a working example of a Spark application that analyzes 
>> > >> data in a non-line-oriented format, such as XML or JSON?  I'd like to 
>> > >> do this without re-inventing the wheel...anyone care to share?  Thanks!
>> > >>
>> > >> Diana
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> If you reply to this email, your message will be added to the 
>> > >> discussion below:
>> > >> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>> > >> To start a new topic under Apache Spark User List, email [hidden email]
>> > >> To unsubscribe from Apache Spark User List, click here.
>> > >> NAML
>> > >>
>> > >>
>> > >> View this message in context: Re: example of non-line oriented input 
>> > >> data?
>> > >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> > >>
>> > >
>> > >
>> >
>> >
>> 
>> 
>> 
> 

Reply via email to