Sengupta
 further to this, if you try the following notebook in databricks cloud, it
will read a .csv file , write to a parquet file and read it again (just to
count the number of rows stored)
Please note that the path to the csv file might differ for you.....
So, what you will need todo is
1 - create an account to community.cloud.databricks.com
2 - upload the .csv file onto the Data of your databricks private cluster
3  - run the script. that will store the data on the distrubuted filesystem
of the databricks cloudn (dbfs)

It's worth investing in this free databricks cloud as it can create a
cluster for you with minimal effort, and it's  a very easy way to test your
spark scripts on a real cluster

hope this helps
kr

##################################
from pyspark.sql import SQLContext

from random import randint
from time import sleep
from pyspark.sql.session import SparkSession
import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
logger.addHandler(ch)


import sys

def read_parquet_file(parquetFileName):
  logger.info('Reading now the parquet files we just created...:%s',
parquetFileName)
  parquet_data = sqlContext.read.parquet(parquetFileName)
  logger.info('Parquet file has %s', parquet_data.count())

def dataprocessing(filePath, count, sqlContext):
    logger.info( 'Iter count is:%s' , count)
    if count == 0:
        print 'exiting'
    else:
        df_traffic_tmp =
sqlContext.read.format("csv").option("header",'true').load(filePath)
        logger.info( '#############################DataSet has:%s' ,
df_traffic_tmp.count())
        logger.info('WRting to a parquet file')
        parquetFileName = "dbfs:/myParquetDf2.parquet"
        df_traffic_tmp.write.parquet(parquetFileName)
        sleepInterval = randint(10,100)
        logger.info( '#############################Sleeping for %s' ,
sleepInterval)
        sleep(sleepInterval)
        read_parquet_file(parquetFileName)
        dataprocessing(filePath, count-1, sqlContext)

filename =
'/FileStore/tables/wb4y1wrv1502027870004/tree_addhealth.csv'#This path
might differ for you
iterations = 1
logger.info('----------------------')
logger.info('Filename:%s', filename)
logger.info('Iterations:%s', iterations )
logger.info('----------------------')

logger.info ('Initializing sqlContext')
logger.info( '........Starting spark..........Loading from%s for %s
iterations' , filename, iterations)
logger.info(  'Starting up....')
sc = SparkSession.builder.appName("Data Processsing").getOrCreate()
logger.info ('Initializing sqlContext')
sqlContext = SQLContext(sc)
dataprocessing(filename, iterations, sqlContext)
logger.info('Out of here..')
######################################


On Sat, Aug 5, 2017 at 9:09 PM, Marco Mistroni <mmistr...@gmail.com> wrote:

> Uh believe me there are lots of ppl on this list who will send u code
> snippets if u ask... 😀
>
> Yes that is what Steve pointed out, suggesting also that for that simple
> exercise you should perform all operations on a spark standalone instead
> (or alt. Use an nfs on the cluster)
> I'd agree with his suggestion....
> I suggest u another alternative:
> https://community.cloud.databricks.com/
>
> That's a ready made cluster and you can run your spark app as well store
> data on the cluster (well I haven't tried myself but I assume it's
> possible).   Try that out... I will try ur script there as I have an
> account there (though I guess I'll get there before me.....)
>
> Try that out and let me know if u get stuck....
> Kr
>
> On Aug 5, 2017 8:40 PM, "Gourav Sengupta" <gourav.sengu...@gmail.com>
> wrote:
>
>> Hi Marco,
>>
>> For the first time in several years FOR THE VERY FIRST TIME. I am seeing
>> someone actually executing code and providing response. It feel wonderful
>> that at least someone considered to respond back by executing code and just
>> did not filter out each and every technical details to brood only on my
>> superb social skills, while claiming the reason for ignoring technical
>> details is that it elementary. I think that Steve also is the first person
>> who could answer the WHY of an elementary question instead of saying that
>> is how it is and pointed out to the correct documentation.
>>
>> That code works fantastically. But the problem which I have tried to find
>> out is while writing out the data and not reading it.
>>
>>
>> So if you see try to read the data from the same folder which has the
>> same file across all the nodes then it will work fine. In fact that is what
>> should work.
>>
>> What does not work is that if you try to write back the file and then
>> read it once again from the location you have written that is when the
>> issue starts happening.
>>
>> Therefore if in my code you were to save the pandas dataframe as a CSV
>> file and then read it then you will find the following observations:
>>
>> FOLLOWING WILL FAIL SINCE THE FILE IS NOT IN ALL THE NODES
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>> columns=list('ABCD'))
>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>> header=True, sep=",", index=0)
>> testdf = spark.read.load("/Users/gouravsengupta/Development/spark/spa
>> rkdata/testdir/")
>> testdf.cache()
>> testdf.count()
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>>
>>
>> FOLLOWING WILL WORK BUT THE PROCESS WILL NOT AT ALL USE THE NODE IN WHICH
>> THE DATA DOES NOT EXISTS
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>> pandasdf = pandas.DataFrame(numpy.random.randn(10000, 4),
>> columns=list('ABCD'))
>> pandasdf.to_csv("/Users/gouravsengupta/Development/spark/sparkdata/testdir/test.csv",
>> header=True, sep=",", index=0)
>> testdf = spark.read.load("file:///Users/gouravsengupta/Development/
>> spark/sparkdata/testdir/")
>> testdf.cache()
>> testdf.count()
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ------------------------------------------------------------
>> ---------------------------
>>
>>
>> if you execute my code then also you will surprisingly see that the
>> writes in the nodes which is not the master node does not complete moving
>> the files from the _temporary folder to the main one.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>>
>> On Fri, Aug 4, 2017 at 9:45 PM, Marco Mistroni <mmistr...@gmail.com>
>> wrote:
>>
>>> Hello
>>>  please have a look at this. it'sa simple script that just read a
>>> dataframe for n time, sleeping at random interval. i used it to test memory
>>> issues that another user was experiencing on a spark cluster
>>>
>>> you should run it like this e.g
>>> spark-submit dataprocessing_Sample.-2py <path to tree_addhealth.csv>
>>> <num of iterations>
>>>
>>> i ran it on the cluster like this
>>>
>>> ./spark-submit --master spark://ec2-54-218-113-119.us-
>>> west-2.compute.amazonaws.com:7077   
>>> /root/pyscripts/dataprocessing_Sample-2.py
>>> file:///root/pyscripts/tree_addhealth.csv
>>>
>>> hth, ping me back if you have issues
>>> i do agree with Steve's comments.... if you want to test your  spark
>>> script s just for playing, do it on  a standaone server on your localhost.
>>> Moving to a c luster is just a matter of deploying your script and mke sure
>>> you have a common place where to read and store the data..... SysAdmin
>>> should give you this when they setup the cluster...
>>>
>>> kr
>>>
>>>
>>>
>>>
>>> On Fri, Aug 4, 2017 at 4:50 PM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
>>>> Hi Marco,
>>>>
>>>> I am sincerely obliged for your kind time and response. Can you please
>>>> try the solution that you have so kindly suggested?
>>>>
>>>> It will be a lot of help if you could kindly execute the code that I
>>>> have given. I dont think that anyone has yet.
>>>>
>>>> There are lots of fine responses to my question here, but if you read
>>>> the last response from Simon, it comes the closest to being satisfactory. I
>>>> am sure even he did not execute the code, but at least he came quite close
>>>> to understanding what the problem is.
>>>>
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>> On Thu, Aug 3, 2017 at 7:59 PM, Marco Mistroni <mmistr...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello
>>>>>  my 2 cents here, hope it helps
>>>>> If you want to just to play around with Spark, i'd leave Hadoop out,
>>>>> it's an unnecessary dependency that you dont need for just running a 
>>>>> python
>>>>> script
>>>>> Instead do the following:
>>>>> - got to the root of our master / slave node. create a directory
>>>>> /root/pyscripts
>>>>> - place your csv file there as well as the python script
>>>>> - run the script to replicate the whole directory  across the cluster
>>>>> (i believe it's called copy-script.sh)
>>>>> - then run your spark-submit , it will be something lke
>>>>>     ./spark-submit /root/pyscripts/mysparkscripts.py
>>>>> file:///root/pyscripts/tree_addhealth.csv 10 --master
>>>>> spark://ip-172-31-44-155.us-west-2.compute.internal:7077
>>>>> - in your python script, as part of your processing, write the parquet
>>>>> file in directory /root/pyscripts
>>>>>
>>>>> If you have an AWS account and you are versatile with that - you need
>>>>> to setup bucket permissions etc - , you can just
>>>>> - store your file in one of your S3 bucket
>>>>> - create an EMR cluster
>>>>> - connect to master or slave
>>>>> - run your  scritp that reads from the s3 bucket and write to the same
>>>>> s3 bucket
>>>>>
>>>>>
>>>>> Feel free to mail me privately, i have a working script i have used to
>>>>> test some code on spark standalone cluster
>>>>> hth
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Aug 3, 2017 at 10:30 AM, Gourav Sengupta <
>>>>> gourav.sengu...@gmail.com> wrote:
>>>>>
>>>>>> Hi Steve,
>>>>>>
>>>>>> I love you mate, thanks a ton once again for ACTUALLY RESPONDING.
>>>>>>
>>>>>> I am now going through the documentation (
>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP
>>>>>> -13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/t
>>>>>> ools/hadoop-aws/s3a_committer_architecture.md) and it makes much
>>>>>> much more sense now.
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> On Thu, Aug 3, 2017 at 10:09 AM, Steve Loughran <
>>>>>> ste...@hortonworks.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> On 2 Aug 2017, at 20:05, Gourav Sengupta <gourav.sengu...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Steve,
>>>>>>>
>>>>>>> I have written a sincere note of apology to everyone in a separate
>>>>>>> email. I sincerely request your kind forgiveness before hand if anything
>>>>>>> does sound impolite in my emails, in advance.
>>>>>>>
>>>>>>> Let me first start by thanking you.
>>>>>>>
>>>>>>> I know it looks like I formed all my opinion based on that document,
>>>>>>> but that is not the case at all. If you or anyone tries to execute the 
>>>>>>> code
>>>>>>> that I have given then they will see what I mean. Code speaks louder and
>>>>>>> better than words for me.
>>>>>>>
>>>>>>> So I am not saying you are wrong. I am asking verify and expecting
>>>>>>> someone will be able to correct  a set of understanding that a moron 
>>>>>>> like
>>>>>>> me has gained after long hours of not having anything better to do.
>>>>>>>
>>>>>>>
>>>>>>> SCENARIO: there are two files file1.csv and file2.csv stored in HDFS
>>>>>>> with replication 2 and there is a HADOOP cluster of three nodes. All 
>>>>>>> these
>>>>>>> nodes have SPARK workers (executors) running in them.  Both are stored 
>>>>>>> in
>>>>>>> the following way:
>>>>>>> -----------------------------------------------------
>>>>>>> | SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
>>>>>>> | (worker1)   |  (worker2)    |  (worker3)   |
>>>>>>> | (master)     |                     |                    |
>>>>>>> -----------------------------------------------------
>>>>>>> | file1.csv      |                     | file1.csv     |
>>>>>>> -----------------------------------------------------
>>>>>>> |                    |  file2.csv      | file2.csv     |
>>>>>>> -----------------------------------------------------
>>>>>>> | file3.csv      |  file3.csv      |                   |
>>>>>>> -----------------------------------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
>>>>>>> HDFS replication does not store the same file in all the nodes in
>>>>>>> the cluster. So if I have three nodes and the replication is two then 
>>>>>>> the
>>>>>>> same file will be stored physically in two nodes in the cluster. Does 
>>>>>>> that
>>>>>>> sound right?
>>>>>>>
>>>>>>>
>>>>>>> HDFS breaks files up into blocks (default = 128MB). If a .csv file
>>>>>>> is > 128 then it will be broken up into blocks
>>>>>>>
>>>>>>> file1.cvs -> [block0001, block002, block0003]
>>>>>>>
>>>>>>> and each block will be replicated. With replication = 2 there will
>>>>>>> be two copies of each block, but the file itself can span > 2 hosts.
>>>>>>>
>>>>>>>
>>>>>>> ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
>>>>>>> If SPARK is trying to process to the records then I am expecting
>>>>>>> that WORKER2 should not be processing file1.csv, and similary WORKER 1
>>>>>>> should not be processing file2.csv and WORKER3 should not be processing
>>>>>>> file3.csv. Because in case WORKER2 was trying to process file1.csv then 
>>>>>>> it
>>>>>>> will actually causing network transmission of the file unnecessarily.
>>>>>>>
>>>>>>>
>>>>>>> Spark prefers to schedule work locally, so as to save on network
>>>>>>> traffic, but it schedules for execution time over waiting for workers 
>>>>>>> free
>>>>>>> on the node with the data. IF a block is on nodes 2 and 3 but there is 
>>>>>>> only
>>>>>>> a free thread on node 1, then node 1 gets the work
>>>>>>>
>>>>>>> There's details on whether/how work across blocks takes place which
>>>>>>> I'm avoiding. For now know those formats which are "splittable" will 
>>>>>>> have
>>>>>>> work scheduled by block. If you use Parquet/ORC/avro for your data and
>>>>>>> compress with snappy, it will be split. This gives you maximum 
>>>>>>> performance
>>>>>>> as >1 thread can work on different blocks. That is, if file1 is split 
>>>>>>> into
>>>>>>> three blocks, three worker threads can process it.
>>>>>>>
>>>>>>>
>>>>>>> ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE
>>>>>>> CLARIFY THIS):
>>>>>>> if WORKER 2 is not processing file1.csv then how does it matter
>>>>>>> whether the file is there or not at all in the system? Should not SPARK
>>>>>>> just ask the workers to process the files which are avialable in the 
>>>>>>> worker
>>>>>>> nodes? In case both WORKER2 and WORKER3 fails and are not available then
>>>>>>> file2.csv will not be processed at all.
>>>>>>>
>>>>>>>
>>>>>>> locality is best-effort, not guaranteed.
>>>>>>>
>>>>>>>
>>>>>>> ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD
>>>>>>> BE EXECUTED (Its been pointed out that I am learning SPARK, and even I 
>>>>>>> did
>>>>>>> not take more than 13 mins to set up the cluster and run the code).
>>>>>>>
>>>>>>> Once you execute the code then you will find that:
>>>>>>> 1.  if the path starts with file:/// while reading back then there
>>>>>>> is no error reported, but the number of records reported back are only
>>>>>>> those records in the worker which also has the server.
>>>>>>> 2. also you will notice that once you cache the file before writing
>>>>>>> the partitions are ditributed nicely across the workers, and while 
>>>>>>> writing
>>>>>>> back, the dataframe partitions does write properly to the worker node in
>>>>>>> the Master, but the workers in the other system have the files written 
>>>>>>> in
>>>>>>> _temporary folder which does not get copied back to the main folder.
>>>>>>> Inspite of this the job is not reported as failed in SPARK.
>>>>>>>
>>>>>>>
>>>>>>> This gets into the "commit protocol". You don't want to know all the
>>>>>>> dirty details (*) but essentially its this
>>>>>>>
>>>>>>> 1. Every worker writes its output to a directory under the
>>>>>>> destination directory, something like '$dest/_temporary/$appAtt
>>>>>>> emptId/_temporary/$taskAttemptID'
>>>>>>> 2. it is the spark driver which "commits" the job by moving the
>>>>>>> output from the individual workers from the temporary directories into
>>>>>>> $dest, then deleting $dest/_temporary
>>>>>>> 3. For which it needs to be able to list all the output in
>>>>>>> $dest/_temporary
>>>>>>>
>>>>>>> In your case, only the output on the same node of the driver is
>>>>>>> being committed, because only those files can be listed and moved. The
>>>>>>> output on the other nodes isn't seen, so isn't committed, nor cleaned 
>>>>>>> up.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Now in my own world, if I see, the following things are happening,
>>>>>>> something is going wrong (with me):
>>>>>>> 1. SPARK transfers files from different systems to process, instead
>>>>>>> of processing them locally (I do not have code to prove this, and 
>>>>>>> therefore
>>>>>>> its just an assumption)
>>>>>>> 2. SPARK cannot determine when the writes are failing in standalone
>>>>>>> clusters workers and reports success (code is there for this)
>>>>>>> 3. SPARK reports back number of records in the worker running in the
>>>>>>> master node when count() is given without reporting an error while using
>>>>>>> file:/// and reports an error when I mention the path without
>>>>>>> file:/// (for SPARK 2.1.x onwards, code is there for this)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> s everyone's been saying, file:// requires a shared filestore, with
>>>>>>> uniform paths everywhere. That's needed to list the files to process, 
>>>>>>> read
>>>>>>> the files in the workers and commit the final output. NFS 
>>>>>>> cross-mounting is
>>>>>>> the simplest way to do this, especially as for three nodes HDFS is
>>>>>>> overkill: more services to keep running, no real fault tolerance. 
>>>>>>> Export a
>>>>>>> directory tree from one of the servers, give the rest access to it, 
>>>>>>> don't
>>>>>>> worry about bandwidth use as the shared disk itself will become the
>>>>>>> bottleneck
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> I very sincerely hope with your genuine help the bar of language and
>>>>>>> social skills will be lowered for me. And everyone will find a way to
>>>>>>> excuse me and not qualify this email as a means to measure my extremely
>>>>>>> versatile and amazingly vivid social skills. It will be a lot of help to
>>>>>>> just focus on the facts related to machines, data, error and (the 
>>>>>>> language
>>>>>>> that I somehow understand better) code.
>>>>>>>
>>>>>>>
>>>>>>> My sincere apologies once again, as I am 100% sure that I did not
>>>>>>> meet the required social and language skills.
>>>>>>>
>>>>>>> Thanks a ton once again for your kindness, patience and
>>>>>>> understanding.
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>>
>>>>>>> * for the curious, the details of the v1 and v2 commit protocols are
>>>>>>> https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-
>>>>>>> 13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/to
>>>>>>> ols/hadoop-aws/s3a_committer_architecture.md
>>>>>>>
>>>>>>> Like I said: you don't want to know the details, and you really
>>>>>>> don't want to step through Hadoop's FileOutputCommitter to see what's 
>>>>>>> going
>>>>>>> on. The Spark side is much easier to follow.
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to