Hi Sorry for confusion, but I meant those functions to be written by you. Those are you r business logic or etl logic On 10 Oct 2016 21:06, "Arun Patel" <arunp.bigd...@gmail.com> wrote:
> Ayan, which version of Python are you using? I am using 2.6.9 and I don't > find generateFileType and getSchemaFor functions. Thanks for your help. > > On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <guha.a...@gmail.com> wrote: > >> Hi >> >> generateFileType (filename) returns FileType >> >> getSchemaFor(FileType) returns schema for FileType >> >> This for loop DOES NOT process files sequentially. It creates dataframes >> on all files which are of same types sequentially. >> >> On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <arunp.bigd...@gmail.com> >> wrote: >> >>> Thanks Ayan. Couple of questions: >>> >>> 1) How does generateFileType and getSchemaFor functions look like? >>> 2) 'For loop' is processing files sequentially, right? my requirement is >>> to process all files at same time. >>> >>> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <guha.a...@gmail.com> wrote: >>> >>>> Hi >>>> >>>> In this case, if you see, t[1] is NOT the file content, as I have added >>>> a "FileType" field. So, this collect is just bringing in the list of file >>>> types, should be fine >>>> >>>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com> >>>> wrote: >>>> >>>>> Thanks Ayan. I am really concerned about the collect. >>>>> >>>>> types = rdd1.map(lambda t: t[1]).distinct().collect() >>>>> >>>>> This will ship all files on to the driver, right? It must be >>>>> inefficient. >>>>> >>>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> I think you are correct direction. What is missing is: You do not >>>>>> need to create DF for each file. You can scramble files with similar >>>>>> structures together (by doing some filter on file name) and then create a >>>>>> DF for each type of file. Also, creating DF on wholeTextFile seems >>>>>> wasteful >>>>>> to me. I would probably do it like this >>>>>> >>>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t: >>>>>> (t[0],generateFileType(t[0]),t[1]) >>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect() >>>>>> >>>>>> DFList = [] >>>>>> >>>>>> for k in types: >>>>>> df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k)) >>>>>> DFList.append(df) >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> My Pyspark program is currently identifies the list of the files >>>>>>> from a directory (Using Python Popen command taking hadoop fs -ls >>>>>>> arguments). For each file, a Dataframe is created and processed. This >>>>>>> is >>>>>>> sequeatial. How to process all files paralelly? Please note that every >>>>>>> file in the directory has different schema. So, depending on the file >>>>>>> name, different logic is used for each file. So, I cannot really create >>>>>>> one >>>>>>> Dataframe for all these files and iterate each row. Using >>>>>>> wholeTextFiles >>>>>>> seems to be good approach for me. But, I am not sure how to create >>>>>>> DataFrame from this. For example, Is there a way to do this way do >>>>>>> something like below. >>>>>>> >>>>>>> def createDFProcess(myrdd): >>>>>>> df = sqlCtx.read.json(myrdd) >>>>>>> df.show() >>>>>>> >>>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', >>>>>>> 'fcontent']) >>>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess) >>>>>>> >>>>>>> Above code does not work. I get an error 'TypeError: 'JavaPackage' >>>>>>> object is not callable'. How to make it work? >>>>>>> >>>>>>> Or is there a better approach? >>>>>>> >>>>>>> -Arun >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Best Regards, >>>> Ayan Guha >>>> >>> >>> >> >> >> -- >> Best Regards, >> Ayan Guha >> > >