Ayan, which version of Python are you using? I am using 2.6.9 and I don't find generateFileType and getSchemaFor functions. Thanks for your help.
On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <guha.a...@gmail.com> wrote: > Hi > > generateFileType (filename) returns FileType > > getSchemaFor(FileType) returns schema for FileType > > This for loop DOES NOT process files sequentially. It creates dataframes > on all files which are of same types sequentially. > > On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <arunp.bigd...@gmail.com> > wrote: > >> Thanks Ayan. Couple of questions: >> >> 1) How does generateFileType and getSchemaFor functions look like? >> 2) 'For loop' is processing files sequentially, right? my requirement is >> to process all files at same time. >> >> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <guha.a...@gmail.com> wrote: >> >>> Hi >>> >>> In this case, if you see, t[1] is NOT the file content, as I have added >>> a "FileType" field. So, this collect is just bringing in the list of file >>> types, should be fine >>> >>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com> >>> wrote: >>> >>>> Thanks Ayan. I am really concerned about the collect. >>>> >>>> types = rdd1.map(lambda t: t[1]).distinct().collect() >>>> >>>> This will ship all files on to the driver, right? It must be >>>> inefficient. >>>> >>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote: >>>> >>>>> Hi >>>>> >>>>> I think you are correct direction. What is missing is: You do not need >>>>> to create DF for each file. You can scramble files with similar structures >>>>> together (by doing some filter on file name) and then create a DF for each >>>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I >>>>> would probably do it like this >>>>> >>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t: >>>>> (t[0],generateFileType(t[0]),t[1]) >>>>> types = rdd1.map(lambda t: t[1]).distinct().collect() >>>>> >>>>> DFList = [] >>>>> >>>>> for k in types: >>>>> df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k)) >>>>> DFList.append(df) >>>>> >>>>> >>>>> >>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com> >>>>> wrote: >>>>> >>>>>> My Pyspark program is currently identifies the list of the files from >>>>>> a directory (Using Python Popen command taking hadoop fs -ls arguments). >>>>>> For each file, a Dataframe is created and processed. This is sequeatial. >>>>>> How to process all files paralelly? Please note that every file in the >>>>>> directory has different schema. So, depending on the file name, >>>>>> different >>>>>> logic is used for each file. So, I cannot really create one Dataframe for >>>>>> all these files and iterate each row. Using wholeTextFiles seems to be >>>>>> good approach for me. But, I am not sure how to create DataFrame from >>>>>> this. For example, Is there a way to do this way do something like >>>>>> below. >>>>>> >>>>>> def createDFProcess(myrdd): >>>>>> df = sqlCtx.read.json(myrdd) >>>>>> df.show() >>>>>> >>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', >>>>>> 'fcontent']) >>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess) >>>>>> >>>>>> Above code does not work. I get an error 'TypeError: 'JavaPackage' >>>>>> object is not callable'. How to make it work? >>>>>> >>>>>> Or is there a better approach? >>>>>> >>>>>> -Arun >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> Best Regards, >>>>> Ayan Guha >>>>> >>>> >>>> >>> >>> >>> -- >>> Best Regards, >>> Ayan Guha >>> >> >> > > > -- > Best Regards, > Ayan Guha >