I'd second the request for Avro support in Python first, followed by Parquet.
On Wed, Mar 19, 2014 at 2:14 PM, Evgeny Shishkin <itparan...@gmail.com>wrote: > > On 19 Mar 2014, at 19:54, Diana Carroll <dcarr...@cloudera.com> wrote: > > Actually, thinking more on this question, Matei: I'd definitely say > support for Avro. There's a lot of interest in this!! > > > Agree, and parquet as default Cloudera Impala format. > > > > > On Tue, Mar 18, 2014 at 8:14 PM, Matei Zaharia <matei.zaha...@gmail.com>wrote: > >> BTW one other thing -- in your experience, Diana, which non-text >> InputFormats would be most useful to support in Python first? Would it be >> Parquet or Avro, simple SequenceFiles with the Hadoop Writable types, or >> something else? I think a per-file text input format that does the stuff we >> did here would also be good. >> >> Matei >> >> >> On Mar 18, 2014, at 3:27 PM, Matei Zaharia <matei.zaha...@gmail.com> >> wrote: >> >> Hi Diana, >> >> This seems to work without the iter() in front if you just return >> treeiterator. What happened when you didn't include that? Treeiterator >> should return an iterator. >> >> Anyway, this is a good example of mapPartitions. It's one where you want >> to view the whole file as one object (one XML here), so you couldn't >> implement this using a flatMap, but you still want to return multiple >> values. The MLlib example you saw needs Python 2.7 because unfortunately >> that is a requirement for our Python MLlib support (see >> http://spark.incubator.apache.org/docs/0.9.0/python-programming-guide.html#libraries). >> We'd like to relax this later but we're using some newer features of NumPy >> and Python. The rest of PySpark works on 2.6. >> >> In terms of the size in memory, here both the string s and the XML tree >> constructed from it need to fit in, so you can't work on very large >> individual XML files. You may be able to use a streaming XML parser instead >> to extract elements from the data in a streaming fashion, without every >> materializing the whole tree. >> http://docs.python.org/2/library/xml.sax.reader.html#module-xml.sax.xmlreaderis >> one example. >> >> Matei >> >> On Mar 18, 2014, at 7:49 AM, Diana Carroll <dcarr...@cloudera.com> wrote: >> >> Well, if anyone is still following this, I've gotten the following code >> working which in theory should allow me to parse whole XML files: (the >> problem was that I can't return the tree iterator directly. I have to call >> iter(). Why?) >> >> import xml.etree.ElementTree as ET >> >> # two source files, format <data> <country >> name="...">...</country>...</data> >> mydata=sc.textFile("file:/home/training/countries*.xml") >> >> def parsefile(iterator): >> s = '' >> for i in iterator: s = s + str(i) >> tree = ET.fromstring(s) >> treeiterator = tree.getiterator("country") >> # why to I have to convert an iterator to an iterator? not sure but >> required >> return iter(treeiterator) >> >> mydata.mapPartitions(lambda x: parsefile(x)).map(lambda element: >> element.attrib).collect() >> >> The output is what I expect: >> [{'name': 'Liechtenstein'}, {'name': 'Singapore'}, {'name': 'Panama'}] >> >> BUT I'm a bit concerned about the construction of the string "s". How >> big can my file be before converting it to a string becomes problematic? >> >> >> >> On Tue, Mar 18, 2014 at 9:41 AM, Diana Carroll <dcarr...@cloudera.com>wrote: >> >>> Thanks, Matei. >>> >>> In the context of this discussion, it would seem mapParitions is >>> essential, because it's the only way I'm going to be able to process each >>> file as a whole, in our example of a large number of small XML files which >>> need to be parsed as a whole file because records are not required to be on >>> a single line. >>> >>> The theory makes sense but I'm still utterly lost as to how to implement >>> it. Unfortunately there's only a single example of the use of >>> mapPartitions in any of the Python example programs, which is the log >>> regression example, which I can't run because it requires Python 2.7 and >>> I'm on Python 2.6. (aside: I couldn't find any statement that Python 2.6 >>> is unsupported...is it?) >>> >>> I'd really really love to see a real life example of a Python use of >>> mapPartitions. I do appreciate the very simple examples you provided, but >>> (perhaps because of my novice status on Python) I can't figure out how to >>> translate those to a real world situation in which I'm building RDDs from >>> files, not inline collections like [(1,2),(2,3)]. >>> >>> Also, you say that the function called in mapPartitions can return a >>> collection OR an iterator. I tried returning an iterator by calling >>> ElementTree getiterator function, but still got the error telling me my >>> object was not an iterator. >>> >>> If anyone has a real life example of mapPartitions returning a Python >>> iterator, that would be fabulous. >>> >>> Diana >>> >>> >>> On Mon, Mar 17, 2014 at 6:17 PM, Matei Zaharia >>> <matei.zaha...@gmail.com>wrote: >>> >>>> Oh, I see, the problem is that the function you pass to mapPartitions >>>> must itself return an iterator or a collection. This is used so that you >>>> can return multiple output records for each input record. You can implement >>>> most of the existing map-like operations in Spark, such as map, filter, >>>> flatMap, etc, with mapPartitions, as well as new ones that might do a >>>> sliding window over each partition for example, or accumulate data across >>>> elements (e.g. to compute a sum). >>>> >>>> For example, if you have data = sc.parallelize([1, 2, 3, 4], 2), this >>>> will work: >>>> >>>> >>> data.mapPartitions(lambda x: x).collect() >>>> [1, 2, 3, 4] # Just return the same iterator, doing nothing >>>> >>>> >>> data.mapPartitions(lambda x: [list(x)]).collect() >>>> [[1, 2], [3, 4]] # Group together the elements of each partition in a >>>> single list (like glom) >>>> >>>> >>> data.mapPartitions(lambda x: [sum(x)]).collect() >>>> [3, 7] # Sum each partition separately >>>> >>>> However something like data.mapPartitions(lambda x: sum(x)).collect() >>>> will *not* work because sum returns a number, not an iterator. That's why I >>>> put sum(x) inside a list above. >>>> >>>> In practice mapPartitions is most useful if you want to share some data >>>> or work across the elements. For example maybe you want to load a lookup >>>> table once from an external file and then check each element in it, or sum >>>> up a bunch of elements without allocating a lot of vector objects. >>>> >>>> Matei >>>> >>>> >>>> On Mar 17, 2014, at 11:25 AM, Diana Carroll <dcarr...@cloudera.com> >>>> wrote: >>>> >>>> > "There's also mapPartitions, which gives you an iterator for each >>>> partition instead of an array. You can then return an iterator or list of >>>> objects to produce from that." >>>> > >>>> > I confess, I was hoping for an example of just that, because i've not >>>> yet been able to figure out how to use mapPartitions. No doubt this is >>>> because i'm a rank newcomer to Python, and haven't fully wrapped my head >>>> around iterators. All I get so far in my attempts to use mapPartitions is >>>> the darned "suchnsuch is not an iterator" error. >>>> > >>>> > def myfunction(iterator): return [1,2,3] >>>> > mydata.mapPartitions(lambda x: myfunction(x)).take(2) >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > On Mon, Mar 17, 2014 at 1:57 PM, Matei Zaharia < >>>> matei.zaha...@gmail.com> wrote: >>>> > Here's an example of getting together all lines in a file as one >>>> string: >>>> > >>>> > $ cat dir/a.txt >>>> > Hello >>>> > world! >>>> > >>>> > $ cat dir/b.txt >>>> > What's >>>> > up?? >>>> > >>>> > $ bin/pyspark >>>> > >>> files = sc.textFile("dir") >>>> > >>>> > >>> files.collect() >>>> > [u'Hello', u'world!', u"What's", u'up??'] # one element per line, >>>> not what we want >>>> > >>>> > >>> files.glom().collect() >>>> > [[u'Hello', u'world!'], [u"What's", u'up??']] # one element per >>>> file, which is an array of lines >>>> > >>>> > >>> files.glom().map(lambda a: "\n".join(a)).collect() >>>> > [u'Hello\nworld!', u"What's\nup??"] # join back each file into a >>>> single string >>>> > >>>> > The glom() method groups all the elements of each partition of an RDD >>>> into an array, giving you an RDD of arrays of objects. If your input is >>>> small files, you always have one partition per file. >>>> > >>>> > There's also mapPartitions, which gives you an iterator for each >>>> partition instead of an array. You can then return an iterator or list of >>>> objects to produce from that. >>>> > >>>> > Matei >>>> > >>>> > >>>> > On Mar 17, 2014, at 10:46 AM, Diana Carroll <dcarr...@cloudera.com> >>>> wrote: >>>> > >>>> > > Thanks Matei. That makes sense. I have here a dataset of many >>>> many smallish XML files, so using mapPartitions that way would make sense. >>>> I'd love to see a code example though ...It's not as obvious to me how to >>>> do that as I probably should be. >>>> > > >>>> > > Thanks, >>>> > > Diana >>>> > > >>>> > > >>>> > > On Mon, Mar 17, 2014 at 1:02 PM, Matei Zaharia < >>>> matei.zaha...@gmail.com> wrote: >>>> > > Hi Diana, >>>> > > >>>> > > Non-text input formats are only supported in Java and Scala right >>>> now, where you can use sparkContext.hadoopFile or .hadoopDataset to load >>>> data with any InputFormat that Hadoop MapReduce supports. In Python, you >>>> unfortunately only have textFile, which gives you one record per line. For >>>> JSON, you'd have to fit the whole JSON object on one line as you said. >>>> Hopefully we'll also have some other forms of input soon. >>>> > > >>>> > > If your input is a collection of separate files (say many .xml >>>> files), you can also use mapPartitions on it to group together the lines >>>> because each input file will end up being a single dataset partition (or >>>> map task). This will let you concatenate the lines in each file and parse >>>> them as one XML object. >>>> > > >>>> > > Matei >>>> > > >>>> > > On Mar 17, 2014, at 9:52 AM, Diana Carroll <dcarr...@cloudera.com> >>>> wrote: >>>> > > >>>> > >> Thanks, Krakna, very helpful. The way I read the code, it looks >>>> like you are assuming that each line in foo.log contains a complete json >>>> object? (That is, that the data doesn't contain any records that are split >>>> into multiple lines.) If so, is that because you know that to be true of >>>> your data? Or did you do as Nicholas suggests and have some preprocessing >>>> on the text input to flatten the data in that way? >>>> > >> >>>> > >> Thanks, >>>> > >> Diana >>>> > >> >>>> > >> >>>> > >> On Mon, Mar 17, 2014 at 12:09 PM, Krakna H <shankark+...@gmail.com> >>>> wrote: >>>> > >> Katrina, >>>> > >> >>>> > >> Not sure if this is what you had in mind, but here's some simple >>>> pyspark code that I recently wrote to deal with JSON files. >>>> > >> >>>> > >> from pyspark import SparkContext, SparkConf >>>> > >> >>>> > >> >>>> > >> >>>> > >> from operator import add >>>> > >> import json >>>> > >> >>>> > >> >>>> > >> >>>> > >> import random >>>> > >> import numpy as np >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> def concatenate_paragraphs(sentence_array): >>>> > >> >>>> > >> >>>> > >> >>>> > >> return ' '.join(sentence_array).split(' ') >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> logFile = 'foo.json' >>>> > >> conf = SparkConf() >>>> > >> >>>> > >> >>>> > >> >>>> > >> conf.setMaster("spark://cluster-master:7077").setAppName("example").set("spark.executor.memory", >>>> "1g") >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> sc = SparkContext(conf=conf) >>>> > >> >>>> > >> >>>> > >> >>>> > >> logData = sc.textFile(logFile).cache() >>>> > >> >>>> > >> >>>> > >> >>>> > >> num_lines = logData.count() >>>> > >> print 'Number of lines: %d' % num_lines >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> # JSON object has the structure: {"key": {'paragraphs': >>>> [sentence1, sentence2, ...]}} >>>> > >> tm = logData.map(lambda s: (json.loads(s)['key'], >>>> len(concatenate_paragraphs(json.loads(s)['paragraphs'])))) >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> tm = tm.reduceByKey(lambda _, x: _ + x) >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> op = tm.collect() >>>> > >> for key, num_words in op: >>>> > >> >>>> > >> >>>> > >> >>>> > >> print 'state: %s, num_words: %d' % (state, num_words) >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> On Mon, Mar 17, 2014 at 11:58 AM, Diana Carroll [via Apache Spark >>>> User List] <[hidden email]> wrote: >>>> > >> I don't actually have any data. I'm writing a course that teaches >>>> students how to do this sort of thing and am interested in looking at a >>>> variety of real life examples of people doing things like that. I'd love >>>> to see some working code implementing the "obvious work-around" you >>>> mention...do you have any to share? It's an approach that makes a lot of >>>> sense, and as I said, I'd love to not have to re-invent the wheel if >>>> someone else has already written that code. Thanks! >>>> > >> >>>> > >> Diana >>>> > >> >>>> > >> >>>> > >> On Mon, Mar 17, 2014 at 11:35 AM, Nicholas Chammas <[hidden >>>> email]> wrote: >>>> > >> There was a previous discussion about this here: >>>> > >> >>>> > >> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/Having-Spark-read-a-JSON-file-td1963.html >>>> > >> >>>> > >> How big are the XML or JSON files you're looking to deal with? >>>> > >> >>>> > >> It may not be practical to deserialize the entire document at >>>> once. In that case an obvious work-around would be to have some kind of >>>> pre-processing step that separates XML nodes/JSON objects with newlines so >>>> that you can analyze the data with Spark in a "line-oriented format". Your >>>> preprocessor wouldn't have to parse/deserialize the massive document; it >>>> would just have to track open/closed tags/braces to know when to insert a >>>> newline. >>>> > >> >>>> > >> Then you'd just open the line-delimited result and deserialize the >>>> individual objects/nodes with map(). >>>> > >> >>>> > >> Nick >>>> > >> >>>> > >> >>>> > >> On Mon, Mar 17, 2014 at 11:18 AM, Diana Carroll <[hidden email]> >>>> wrote: >>>> > >> Has anyone got a working example of a Spark application that >>>> analyzes data in a non-line-oriented format, such as XML or JSON? I'd like >>>> to do this without re-inventing the wheel...anyone care to share? Thanks! >>>> > >> >>>> > >> Diana >>>> > >> >>>> > >> >>>> > >> >>>> > >> >>>> > >> If you reply to this email, your message will be added to the >>>> discussion below: >>>> > >> >>>> http://apache-spark-user-list.1001560.n3.nabble.com/example-of-non-line-oriented-input-data-tp2750p2752.html >>>> > >> To start a new topic under Apache Spark User List, email [hidden >>>> email] >>>> > >> To unsubscribe from Apache Spark User List, click here. >>>> > >> NAML >>>> > >> >>>> > >> >>>> > >> View this message in context: Re: example of non-line oriented >>>> input data? >>>> > >> Sent from the Apache Spark User List mailing list archive at >>>> Nabble.com <http://nabble.com/>. >>>> > >> >>>> > > >>>> > > >>>> > >>>> > >>>> >>>> >>> >> >> >> > >