Hi

I think you are correct direction. What is missing is: You do not need to
create DF for each file. You can scramble files with similar structures
together (by doing some filter on file name) and then create a DF for each
type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
would probably do it like this

rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
(t[0],generateFileType(t[0]),t[1])
types = rdd1.map(lambda t: t[1]).distinct().collect()

DFList = []

for k in types:
     df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
     DFList.append(df)



On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com> wrote:

> My Pyspark program is currently identifies the list of the files from a
> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
> each file, a Dataframe is created and processed. This is sequeatial. How to
> process all files paralelly?  Please note that every file in the directory
> has different schema.  So, depending on the file name, different logic is
> used for each file. So, I cannot really create one Dataframe for all these
> files and iterate each row.  Using wholeTextFiles seems to be good approach
> for me.  But, I am not sure how to create DataFrame from this.  For
> example, Is there a way to do this way do something like below.
>
> def createDFProcess(myrdd):
>     df = sqlCtx.read.json(myrdd)
>     df.show()
>
> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>
> Above code does not work.  I get an error 'TypeError: 'JavaPackage' object
> is not callable'.  How to make it work?
>
> Or is there a better approach?
>
> -Arun
>
>


-- 
Best Regards,
Ayan Guha

Reply via email to