I'd second the request for Avro support in Python first, followed by
Parquet.


On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <itparan...@gmail.com>wrote:

>
> On 19 Mar 2014, at 19:54, Diana Carroll <dcarr...@cloudera.com> wrote:
>
> Actually, thinking more on this question, Matei: I'd definitely say
> support for Avro.  There's a lot of interest in this!!
>
>
> Agree, and parquet as default Cloudera Impala format.
>
>
>
>
> On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote:
>
>> BTW one other thing -- in your experience, Diana, which non-text
>> InputFormats would be most useful to support in Python first? Would it be
>> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or
>> something else? I think a per-file text input format that does the stuff we
>> did here would also be good.
>>
>> Matei
>>
>>
>> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaha...@gmail.com>
>> wrote:
>>
>> Hi Diana,
>>
>> This seems to work without the iter() in front if you just return
>> treeiterator. What happened when you didn't include that? Treeiterator
>> should return an iterator.
>>
>> Anyway, this is a good example of mapPartitions. It's one where you want
>> to view the whole file as one object (one XML here), so you couldn't
>> implement this using a flatMap, but you still want to return multiple
>> values. The MLlib example you saw needs Python 2.7 because unfortunately
>> that is a requirement for our Python MLlib support (see
>> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries).
>> We'd like to relax this later but we're using some newer features of NumPy
>> and Python. The rest of PySpark works on 2.6.
>>
>> In terms of the size in memory, here both the string s and the XML tree
>> constructed from it need to fit in, so you can't work on very large
>> individual XML files. You may be able to use a streaming XML parser instead
>> to extract elements from the data in a streaming fashion, without every
>> materializing the whole tree.
>> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis
>>  one example.
>>
>> Matei
>>
>> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarr...@cloudera.com> wrote:
>>
>> Well, if anyone is still following this, I've gotten the following code
>> working which in theory should allow me to parse whole XML files: (the
>> problem was that I can't return the tree iterator directly.  I have to call
>> iter().  Why?)
>>
>> import xml.etree.ElementTree as ET
>>
>> # two source files, format <data> <country
>> name="...">...</country>...</data>
>> mydata=sc.textFile("file:/home/training/countries*.xml")
>>
>> def parsefile(iterator):
>>     s = ''
>>     for i in iterator: s = s + str(i)
>>     tree = ET.fromstring(s)
>>     treeiterator = tree.getiterator("country")
>>     # why to I have to convert an iterator to an iterator?  not sure but
>> required
>>     return iter(treeiterator)
>>
>> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element:
>> element.attrib).collect()
>>
>> The output is what I expect:
>> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}]
>>
>> BUT I'm a bit concerned about the construction of the string "s".  How
>> big can my file be before converting it to a string becomes problematic?
>>
>>
>>
>> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com>wrote:
>>
>>> Thanks, Matei.
>>>
>>> In the context of this discussion, it would seem mapParitions is
>>> essential, because it's the only way I'm going to be able to process each
>>> file as a whole, in our example of a large number of small XML files which
>>> need to be parsed as a whole file because records are not required to be on
>>> a single line.
>>>
>>> The theory makes sense but I'm still utterly lost as to how to implement
>>> it.  Unfortunately there's only a single example of the use of
>>> mapPartitions in any of the Python example programs, which is the log
>>> regression example, which I can't run because it requires Python 2.7 and
>>> I'm on Python 2.6.  (aside: I couldn't find any statement that Python 2.6
>>> is unsupported...is it?)
>>>
>>> I'd really really love to see a real life example of a Python use of
>>> mapPartitions.  I do appreciate the very simple examples you provided, but
>>> (perhaps because of my novice status on Python) I can't figure out how to
>>> translate those to a real world situation in which I'm building RDDs from
>>> files, not inline collections like [(1,2),(2,3)].
>>>
>>> Also, you say that the function called in mapPartitions can return a
>>> collection OR an iterator.  I tried returning an iterator by calling
>>> ElementTree getiterator function, but still got the error telling me my
>>> object was not an iterator.
>>>
>>> If anyone has a real life example of mapPartitions returning a Python
>>> iterator, that would be fabulous.
>>>
>>> Diana
>>>
>>>
>>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia 
>>> <matei.zaha...@gmail.com>wrote:
>>>
>>>> Oh, I see, the problem is that the function you pass to mapPartitions
>>>> must itself return an iterator or a collection. This is used so that you
>>>> can return multiple output records for each input record. You can implement
>>>> most of the existing map-like operations in Spark, such as map, filter,
>>>> flatMap, etc, with mapPartitions, as well as new ones that might do a
>>>> sliding window over each partition for example, or accumulate data across
>>>> elements (e.g. to compute a sum).
>>>>
>>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this
>>>> will work:
>>>>
>>>> >>> data.mapPartitions(lambda x: x).collect()
>>>> [1, 2, 3, 4]   # Just return the same iterator, doing nothing
>>>>
>>>> >>> data.mapPartitions(lambda x: [list(x)]).collect()
>>>> [[1, 2], [3, 4]]   # Group together the elements of each partition in a
>>>> single list (like glom)
>>>>
>>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect()
>>>> [3, 7]   # Sum each partition separately
>>>>
>>>> However something like data.mapPartitions(lambda x: sum(x)).collect()
>>>> will *not* work because sum returns a number, not an iterator. That's why I
>>>> put sum(x) inside a list above.
>>>>
>>>> In practice mapPartitions is most useful if you want to share some data
>>>> or work across the elements. For example maybe you want to load a lookup
>>>> table once from an external file and then check each element in it, or sum
>>>> up a bunch of elements without allocating a lot of vector objects.
>>>>
>>>> Matei
>>>>
>>>>
>>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com>
>>>> wrote:
>>>>
>>>> > "There's also mapPartitions, which gives you an iterator for each
>>>> partition instead of an array. You can then return an iterator or list of
>>>> objects to produce from that."
>>>> >
>>>> > I confess, I was hoping for an example of just that, because i've not
>>>> yet been able to figure out how to use mapPartitions.  No doubt this is
>>>> because i'm a rank newcomer to Python, and haven't fully wrapped my head
>>>> around iterators.  All I get so far in my attempts to use mapPartitions is
>>>> the darned "suchnsuch is not an iterator" error.
>>>> >
>>>> > def myfunction(iterator): return [1,2,3]
>>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2)
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia <
>>>> matei.zaha...@gmail.com> wrote:
>>>> > Here's an example of getting together all lines in a file as one
>>>> string:
>>>> >
>>>> > $ cat dir/a.txt
>>>> > Hello
>>>> > world!
>>>> >
>>>> > $ cat dir/b.txt
>>>> > What's
>>>> > up??
>>>> >
>>>> > $ bin/pyspark
>>>> > >>> files = sc.textFile("dir")
>>>> >
>>>> > >>> files.collect()
>>>> > [u'Hello', u'world!', u"What's", u'up??']   # one element per line,
>>>> not what we want
>>>> >
>>>> > >>> files.glom().collect()
>>>> > [[u'Hello', u'world!'], [u"What's", u'up??']]   # one element per
>>>> file, which is an array of lines
>>>> >
>>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect()
>>>> > [u'Hello\nworld!', u"What's\nup??"]    # join back each file into a
>>>> single string
>>>> >
>>>> > The glom() method groups all the elements of each partition of an RDD
>>>> into an array, giving you an RDD of arrays of objects. If your input is
>>>> small files, you always have one partition per file.
>>>> >
>>>> > There's also mapPartitions, which gives you an iterator for each
>>>> partition instead of an array. You can then return an iterator or list of
>>>> objects to produce from that.
>>>> >
>>>> > Matei
>>>> >
>>>> >
>>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com>
>>>> wrote:
>>>> >
>>>> > > Thanks Matei.  That makes sense.  I have here a dataset of many
>>>> many smallish XML files, so using mapPartitions that way would make sense.
>>>>  I'd love to see a code example though ...It's not as obvious to me how to
>>>> do that as I probably should be.
>>>> > >
>>>> > > Thanks,
>>>> > > Diana
>>>> > >
>>>> > >
>>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia <
>>>> matei.zaha...@gmail.com> wrote:
>>>> > > Hi Diana,
>>>> > >
>>>> > > Non-text input formats are only supported in Java and Scala right
>>>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load
>>>> data with any InputFormat that Hadoop MapReduce supports. In Python, you
>>>> unfortunately only have textFile, which gives you one record per line. For
>>>> JSON, you'd have to fit the whole JSON object on one line as you said.
>>>> Hopefully we'll also have some other forms of input soon.
>>>> > >
>>>> > > If your input is a collection of separate files (say many .xml
>>>> files), you can also use mapPartitions on it to group together the lines
>>>> because each input file will end up being a single dataset partition (or
>>>> map task). This will let you concatenate the lines in each file and parse
>>>> them as one XML object.
>>>> > >
>>>> > > Matei
>>>> > >
>>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com>
>>>> wrote:
>>>> > >
>>>> > >> Thanks, Krakna, very helpful.  The way I read the code, it looks
>>>> like you are assuming that each line in foo.log contains a complete json
>>>> object?  (That is, that the data doesn't contain any records that are split
>>>> into multiple lines.)  If so, is that because you know that to be true of
>>>> your data?  Or did you do as Nicholas suggests and have some preprocessing
>>>> on the text input to flatten the data in that way?
>>>> > >>
>>>> > >> Thanks,
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com>
>>>> wrote:
>>>> > >> Katrina,
>>>> > >>
>>>> > >> Not sure if this is what you had in mind, but here's some simple
>>>> pyspark code that I recently wrote to deal with JSON files.
>>>> > >>
>>>> > >> from pyspark import SparkContext, SparkConf
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> from operator import add
>>>> > >> import json
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> import random
>>>> > >> import numpy as np
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> def concatenate_paragraphs(sentence_array):
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> return ' '.join(sentence_array).split(' ')
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logFile = 'foo.json'
>>>> > >> conf = SparkConf()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory",
>>>> "1g")
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> sc = SparkContext(conf=conf)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> logData = sc.textFile(logFile).cache()
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> num_lines = logData.count()
>>>> > >> print 'Number of lines: %d' % num_lines
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> # JSON object has the structure: {"key": {'paragraphs':
>>>> [sentence1, sentence2, ...]}}
>>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'],
>>>> len(concatenate_paragraphs(json.loads(s)['paragraphs']))))
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> op = tm.collect()
>>>> > >> for key, num_words in op:
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>      print 'state: %s, num_words: %d' % (state, num_words)
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark
>>>> User List] <[hidden email]> wrote:
>>>> > >> I don't actually have any data.  I'm writing a course that teaches
>>>> students how to do this sort of thing and am interested in looking at a
>>>> variety of real life examples of people doing things like that.  I'd love
>>>> to see some working code implementing the "obvious work-around" you
>>>> mention...do you have any to share?  It's an approach that makes a lot of
>>>> sense, and as I said, I'd love to not have to re-invent the wheel if
>>>> someone else has already written that code.  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden
>>>> email]> wrote:
>>>> > >> There was a previous discussion about this here:
>>>> > >>
>>>> > >>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html
>>>> > >>
>>>> > >> How big are the XML or JSON files you're looking to deal with?
>>>> > >>
>>>> > >> It may not be practical to deserialize the entire document at
>>>> once. In that case an obvious work-around would be to have some kind of
>>>> pre-processing step that separates XML nodes/JSON objects with newlines so
>>>> that you can analyze the data with Spark in a "line-oriented format". Your
>>>> preprocessor wouldn't have to parse/deserialize the massive document; it
>>>> would just have to track open/closed tags/braces to know when to insert a
>>>> newline.
>>>> > >>
>>>> > >> Then you'd just open the line-delimited result and deserialize the
>>>> individual objects/nodes with map().
>>>> > >>
>>>> > >> Nick
>>>> > >>
>>>> > >>
>>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]>
>>>> wrote:
>>>> > >> Has anyone got a working example of a Spark application that
>>>> analyzes data in a non-line-oriented format, such as XML or JSON?  I'd like
>>>> to do this without re-inventing the wheel...anyone care to share?  Thanks!
>>>> > >>
>>>> > >> Diana
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >>
>>>> > >> If you reply to this email, your message will be added to the
>>>> discussion below:
>>>> > >>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html
>>>> > >> To start a new topic under Apache Spark User List, email [hidden
>>>> email]
>>>> > >> To unsubscribe from Apache Spark User List, click here.
>>>> > >> NAML
>>>> > >>
>>>> > >>
>>>> > >> View this message in context: Re: example of non-line oriented
>>>> input data?
>>>> > >> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com <http://nabble.com/>.
>>>> > >>
>>>> > >
>>>> > >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>
>>
>>
>
>

Reply via email to