Hi I think you are correct direction. What is missing is: You do not need to create DF for each file. You can scramble files with similar structures together (by doing some filter on file name) and then create a DF for each type of file. Also, creating DF on wholeTextFile seems wasteful to me. I would probably do it like this
rdd1 = sc.wholeTextFile(inputpath).map(lambda t: (t[0],generateFileType(t[0]),t[1]) types = rdd1.map(lambda t: t[1]).distinct().collect() DFList = [] for k in types: df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k)) DFList.append(df) On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com> wrote: > My Pyspark program is currently identifies the list of the files from a > directory (Using Python Popen command taking hadoop fs -ls arguments). For > each file, a Dataframe is created and processed. This is sequeatial. How to > process all files paralelly? Please note that every file in the directory > has different schema. So, depending on the file name, different logic is > used for each file. So, I cannot really create one Dataframe for all these > files and iterate each row. Using wholeTextFiles seems to be good approach > for me. But, I am not sure how to create DataFrame from this. For > example, Is there a way to do this way do something like below. > > def createDFProcess(myrdd): > df = sqlCtx.read.json(myrdd) > df.show() > > whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent']) > whfiles.map(lambda file: file.fcontent).foreach(createDFProcess) > > Above code does not work. I get an error 'TypeError: 'JavaPackage' object > is not callable'. How to make it work? > > Or is there a better approach? > > -Arun > > -- Best Regards, Ayan Guha