I witness really weird behavior when loading the data from RDBMS. I tried different approach for loading the data - I provided a partitioning column for make partitioning parallelism:
val df_init = sqlContext.read.format("jdbc").options( Map("url" -> Configuration.dbUrl, "dbtable" -> Configuration.dbTable, "driver" -> Configuration.dbDriver, "partitionColumn"-> "Target", "lowerBound" -> "30000", "upperBound" -> "90000", "numPartitions" -> Configuration.appPartitioning.toString )).load() But what I get when I check storage tab on the UI is following distribution: Data Distribution on 7 Executors HostMemory UsageDisk Usage spark1.clust:56209 145.3 MB (16.1 GB Remaining) 0.0 B 10.2.0.4:50305 0.0 B (37.2 GB Remaining) 0.0 B spark5.clust:41822 112.0 B (16.9 GB Remaining) 0.0 B spark4.clust:56687 112.0 B (16.9 GB Remaining) 0.0 B spark3.clust:34263 0.0 B (16.9 GB Remaining) 0.0 B spark2.clust:43663 112.0 B (16.9 GB Remaining) 0.0 B spark0.clust:57445 112.0 B (16.9 GB Remaining) 0.0 B Almost all the data resides on one node, the rest is negligible. Any idea what might be wrong with this setting? I admit that partitioning field is not uniformly distributed but Latter on during the computation I try to repartition data frames but the effect is that data get collected to one node. val df_init = sqlContext.read.format("jdbc").options( Map("url" -> Configuration.dbUrl, "dbtable" -> Configuration.dbTable, "driver" -> Configuration.dbDriver, "partitionColumn"-> "Target", "lowerBound" -> "30000", "upperBound" -> "90000", // "numPartitions" -> Configuration.appPartitioning.toString "numPartitions" -> "35" )).load() df_init.cache() df_init.registerTempTable("df_init") val df = (if (Configuration.dataSubset) { val (loadingCondition, TypeId) = if (args.length > 1) { (args(1), args(2)) } else (Configuration.dataCondition, Configuration.dataType) sqlContext.sql( s"""SELECT statmement ... Condition = '$Condition'""".stripMargin) } else { df_init }).repartition(Configuration.appPartitioning) df.persist() Seems that none of those actually work as expected. It seems that I cannot distribute the data across the cluster. Could someone more experienced provide some hints whot might be wrong? Thanks On 14 July 2016 at 19:31, Jakub Stransky <stransky...@gmail.com> wrote: > HI Talebzadeh, > > sorry I forget to answer last part of your question: > > At O/S level you should see many CoarseGrainedExecutorBackend through jps > each corresponding to one executor. Are they doing anything? > > There is one worker with one executor bussy and the rest is almost idle: > > PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND > 9305 spark 20 0 30.489g 5.075g 22256 S * 0.3 18.5* 0:36.25 java > > The only one - bussy one is running all 8cores machine > > PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND > 9580 zdata 20 0 29.664g 0.021t 6836 S* 676.7 79.4* 40:08.61 java > > > Thanks > Jakub > > On 14 July 2016 at 19:22, Jakub Stransky <stransky...@gmail.com> wrote: > >> HI Talebzadeh, >> >> we are using 6 worker machines - running. >> >> We are reading the data through sqlContext (data frame) as it is >> suggested in the documentation over the JdbcRdd >> >> prop just specifies name, password, and driver class. >> >> Right after this data load we register it as a temp table >> >> val df_init = sqlContext.read >> .jdbc( >> url = Configuration.dbUrl, >> table = Configuration.dbTable, >> prop >> ) >> >> df_init.registerTempTable("df_init") >> >> Afterwords we do some data filtering, column selection and filtering some >> rows with sqlContext.sql ("select statement here") >> >> and after this selection we try to repartition the data in order to get >> them distributed across the cluster and that seems it is not working. And >> then we persist that filtered and selected dataFrame. >> >> And the desired state should be filtered dataframe should be distributed >> accross the nodes in the cluster. >> >> Jakub >> >> >> >> On 14 July 2016 at 19:03, Mich Talebzadeh <mich.talebza...@gmail.com> >> wrote: >> >>> Hi Jakub, >>> >>> Sounds like one executor. Can you point out: >>> >>> >>> 1. The number of slaves/workers you are running >>> 2. Are you using JDBC to read data in? >>> 3. Do you register DF as temp table and if so have you cached temp >>> table >>> >>> Sounds like only one executor is active and the rest are sitting idele. >>> >>> At O/S level you should see many CoarseGrainedExecutorBackend through >>> jps each corresponding to one executor. Are they doing anything? >>> >>> HTH >>> >>> Dr Mich Talebzadeh >>> >>> >>> >>> LinkedIn * >>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>* >>> >>> >>> >>> http://talebzadehmich.wordpress.com >>> >>> >>> *Disclaimer:* Use it at your own risk. Any and all responsibility for >>> any loss, damage or destruction of data or any other property which may >>> arise from relying on this email's technical content is explicitly >>> disclaimed. The author will in no case be liable for any monetary damages >>> arising from such loss, damage or destruction. >>> >>> >>> >>> On 14 July 2016 at 17:18, Jakub Stransky <stransky...@gmail.com> wrote: >>> >>>> Hello, >>>> >>>> I have a spark cluster running in a single mode, master + 6 executors. >>>> >>>> My application is reading a data from database via DataFrame.read then >>>> there is a filtering of rows. After that I re-partition data and I wonder >>>> why on the executors page of the driver UI I see RDD blocks all allocated >>>> still on single executor machine >>>> >>>> [image: Inline images 1] >>>> As highlighted on the picture above. I did expect that after >>>> re-partition the data will be shuffled across cluster but that is obviously >>>> not happening here. >>>> >>>> I can understand that database read is happening in non-parallel >>>> fashion but re-partition should fix it as far as I understand. >>>> >>>> Could someone experienced clarify that? >>>> >>>> Thanks >>>> >>> >>> >> >> >> -- >> Jakub Stransky >> cz.linkedin.com/in/jakubstransky >> >> > > > -- > Jakub Stransky > cz.linkedin.com/in/jakubstransky > > -- Jakub Stransky cz.linkedin.com/in/jakubstransky