For case 1, you can create 3 notebooks and 3 jobs in databricks. Then you can run them in parallel
On Wed, 22 Jan 2020 at 3:50 am, anbutech <anbutec...@outlook.com> wrote: > Hi sir, > > Could you please help me on the below two cases in the databricks pyspark > data processing terabytes of json data read from aws s3 bucket. > > case 1: > > currently I'm reading multiple tables sequentially to get the day count > from each table > > for ex: table_list.csv having one column with multiple table names > > year=2019 > month=12 > > tablesDF = > > spark.read.format("csv").option("header",false).load("s3a://bucket//source/table_list.csv") > tabList = tablesDF.toPandas().values.tolist() > for table in tabList: > tab_name = table[0] > > // Snowflake Settings and snowflake table count() > > sfOptions = dict( > "sfURL" -> "", > "sfAccount" -> "", > "sfUser" -> "", > "sfPassword" -> "", > "sfDatabase" -> "", > "sfSchema" -> "", > "sfWarehouse" -> "", > ) > > // Read data as dataframe > > sfxdf = spark.read > .format("snowflake") > .options(**sfOptions) > .option("query", "select y as year,m as month,count(*) as sCount from > {} where y={} and m={} group by year,month").format(tab_name,year,month) > .load() > > //databricks delta lake > > dbxDF = spark.sql("select y as year,m as month,count(*) as dCount > from > db.{} where y={} and m={}" group by > year,month).format(tab_name,year,month) > > resultDF = dbxDF.join(sfxdf, on=['year', 'month'], how='left_outer' > ).na.fill(0).withColumn("flag_col", expr("dCount == sCount")) > > finalDF = resultDF.withColumn("table_name", > > lit(tab_name)).select("table_name","year","month","dCount","sCount","flag_col") > > > finalDF.coalesce(1).write.format('csv').option('header', > 'true').save("s3a://outputs/reportcsv) > > Question: > > 1) Instead of sequence based running the count query taking one by > one > tables ,how to parallel read all the tables from the csv file from s3 and > distributed the jobs across the cluster. > > 2) Could you please how to optimize the above code in the pyspark > for > parallel processing all the count query at the same time. > > > > Case 2 : > > Multiprocessing case: > ------------------------ > > Could you please help me how to achieve multiprocessing on the > above > pyspark query to parallel running in the distributed environment. > > By using below snippets is there any way to achieve the parallel > processing > pyspark code in the cluster. > > # Creating a pool of 20 processes. You can set this as per your > intended > parallelism and your available resources. > > > > > start = time.time() > pool = multiprocessing.Pool(20) > # This will execute get_counts() parallel, on each element inside > input_paths. > # result (a list of dictionary) is constructed when all executions are > completed. > //result = pool.map(get_counts, input_paths) > > end = time.time() > > result_df = pd.DataFrame(result) > # You can use, result_df.to_csv() to store the results in a csv. > print(result_df) > print('Time take : {}'.format(end - start)) > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > -- Best Regards, Ayan Guha