Thanks Ayan.  Couple of questions:

1) How does generateFileType and getSchemaFor functions look like?
2) 'For loop' is processing files sequentially, right? my requirement is to
process all files at same time.

On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> In this case, if you see, t[1] is NOT the file content, as I have added a
> "FileType" field. So, this collect is just bringing in the list of file
> types, should be fine
>
> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com>
> wrote:
>
>> Thanks Ayan.  I am really concerned about the collect.
>>
>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>
>> This will ship all files on to the driver, right?  It must be
>> inefficient.
>>
>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I think you are correct direction. What is missing is: You do not need
>>> to create DF for each file. You can scramble files with similar structures
>>> together (by doing some filter on file name) and then create a DF for each
>>> type of file. Also, creating DF on wholeTextFile seems wasteful to me. I
>>> would probably do it like this
>>>
>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>> (t[0],generateFileType(t[0]),t[1])
>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>
>>> DFList = []
>>>
>>> for k in types:
>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>      DFList.append(df)
>>>
>>>
>>>
>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com>
>>> wrote:
>>>
>>>> My Pyspark program is currently identifies the list of the files from a
>>>> directory (Using Python Popen command taking hadoop fs -ls arguments).  For
>>>> each file, a Dataframe is created and processed. This is sequeatial. How to
>>>> process all files paralelly?  Please note that every file in the directory
>>>> has different schema.  So, depending on the file name, different logic is
>>>> used for each file. So, I cannot really create one Dataframe for all these
>>>> files and iterate each row.  Using wholeTextFiles seems to be good approach
>>>> for me.  But, I am not sure how to create DataFrame from this.  For
>>>> example, Is there a way to do this way do something like below.
>>>>
>>>> def createDFProcess(myrdd):
>>>>     df = sqlCtx.read.json(myrdd)
>>>>     df.show()
>>>>
>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname', 'fcontent'])
>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>
>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>> object is not callable'.  How to make it work?
>>>>
>>>> Or is there a better approach?
>>>>
>>>> -Arun
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to