Hi

Sorry for confusion, but I meant those functions to be written by you.
Those are you r business logic or etl logic
On 10 Oct 2016 21:06, "Arun Patel" <arunp.bigd...@gmail.com> wrote:

> Ayan, which version of Python are you using? I am using 2.6.9 and I don't
> find generateFileType and getSchemaFor functions.  Thanks for your help.
>
> On Fri, Oct 7, 2016 at 1:17 AM, ayan guha <guha.a...@gmail.com> wrote:
>
>> Hi
>>
>> generateFileType (filename) returns FileType
>>
>> getSchemaFor(FileType) returns schema for FileType
>>
>> This for loop DOES NOT process files sequentially. It creates dataframes
>> on all files which are of same types sequentially.
>>
>> On Fri, Oct 7, 2016 at 12:08 AM, Arun Patel <arunp.bigd...@gmail.com>
>> wrote:
>>
>>> Thanks Ayan.  Couple of questions:
>>>
>>> 1) How does generateFileType and getSchemaFor functions look like?
>>> 2) 'For loop' is processing files sequentially, right? my requirement is
>>> to process all files at same time.
>>>
>>> On Thu, Oct 6, 2016 at 8:52 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> In this case, if you see, t[1] is NOT the file content, as I have added
>>>> a "FileType" field. So, this collect is just bringing in the list of file
>>>> types, should be fine
>>>>
>>>> On Thu, Oct 6, 2016 at 11:47 PM, Arun Patel <arunp.bigd...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks Ayan.  I am really concerned about the collect.
>>>>>
>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>
>>>>> This will ship all files on to the driver, right?  It must be
>>>>> inefficient.
>>>>>
>>>>> On Thu, Oct 6, 2016 at 7:58 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I think you are correct direction. What is missing is: You do not
>>>>>> need to create DF for each file. You can scramble files with similar
>>>>>> structures together (by doing some filter on file name) and then create a
>>>>>> DF for each type of file. Also, creating DF on wholeTextFile seems 
>>>>>> wasteful
>>>>>> to me. I would probably do it like this
>>>>>>
>>>>>> rdd1 = sc.wholeTextFile(inputpath).map(lambda t:
>>>>>> (t[0],generateFileType(t[0]),t[1])
>>>>>> types = rdd1.map(lambda t: t[1]).distinct().collect()
>>>>>>
>>>>>> DFList = []
>>>>>>
>>>>>> for k in types:
>>>>>>      df = rdd1.filter(lambda t: t[1]==k).toDF(schema=getSchemaFor(k))
>>>>>>      DFList.append(df)
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Oct 6, 2016 at 10:26 PM, Arun Patel <arunp.bigd...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> My Pyspark program is currently identifies the list of the files
>>>>>>> from a directory (Using Python Popen command taking hadoop fs -ls
>>>>>>> arguments).  For each file, a Dataframe is created and processed. This 
>>>>>>> is
>>>>>>> sequeatial. How to process all files paralelly?  Please note that every
>>>>>>> file in the directory has different schema.  So, depending on the file
>>>>>>> name, different logic is used for each file. So, I cannot really create 
>>>>>>> one
>>>>>>> Dataframe for all these files and iterate each row.  Using 
>>>>>>> wholeTextFiles
>>>>>>> seems to be good approach for me.  But, I am not sure how to create
>>>>>>> DataFrame from this.  For example, Is there a way to do this way do
>>>>>>> something like below.
>>>>>>>
>>>>>>> def createDFProcess(myrdd):
>>>>>>>     df = sqlCtx.read.json(myrdd)
>>>>>>>     df.show()
>>>>>>>
>>>>>>> whfiles = sc.wholeTextFiles('/input/dir1').toDF(['fname',
>>>>>>> 'fcontent'])
>>>>>>> whfiles.map(lambda file: file.fcontent).foreach(createDFProcess)
>>>>>>>
>>>>>>> Above code does not work.  I get an error 'TypeError: 'JavaPackage'
>>>>>>> object is not callable'.  How to make it work?
>>>>>>>
>>>>>>> Or is there a better approach?
>>>>>>>
>>>>>>> -Arun
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Ayan Guha
>>>>
>>>
>>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>

Reply via email to