I witness really weird behavior when loading the data from RDBMS.

I tried different approach for loading the data - I provided a partitioning
column for make partitioning parallelism:

    val df_init = sqlContext.read.format("jdbc").options(
      Map("url" -> Configuration.dbUrl,
        "dbtable" -> Configuration.dbTable,
        "driver" -> Configuration.dbDriver,
        "partitionColumn"-> "Target",
        "lowerBound" -> "30000",
        "upperBound" -> "90000",
        "numPartitions" -> Configuration.appPartitioning.toString
      )).load()


But what I get when I check storage tab on the UI is following distribution:

Data Distribution on 7 Executors
HostMemory UsageDisk Usage
spark1.clust:56209 145.3 MB (16.1 GB Remaining) 0.0 B
10.2.0.4:50305 0.0 B (37.2 GB Remaining) 0.0 B
spark5.clust:41822 112.0 B (16.9 GB Remaining) 0.0 B
spark4.clust:56687 112.0 B (16.9 GB Remaining) 0.0 B
spark3.clust:34263 0.0 B (16.9 GB Remaining) 0.0 B
spark2.clust:43663 112.0 B (16.9 GB Remaining) 0.0 B
spark0.clust:57445 112.0 B (16.9 GB Remaining) 0.0 B

Almost all the data resides on one node, the rest is negligible. Any idea
what might be wrong with this setting? I admit that partitioning field is
not uniformly distributed but

Latter on during the computation I try to repartition data frames but the
effect is that data get collected to one node.

    val df_init = sqlContext.read.format("jdbc").options(
      Map("url" -> Configuration.dbUrl,
        "dbtable" -> Configuration.dbTable,
        "driver" -> Configuration.dbDriver,
        "partitionColumn"-> "Target",
        "lowerBound" -> "30000",
        "upperBound" -> "90000",
//        "numPartitions" -> Configuration.appPartitioning.toString
        "numPartitions" -> "35"
      )).load()

    df_init.cache()

    df_init.registerTempTable("df_init")

    val df = (if (Configuration.dataSubset) {
      val (loadingCondition, TypeId) = if (args.length > 1) {
        (args(1), args(2))
      }
      else
        (Configuration.dataCondition, Configuration.dataType)

      sqlContext.sql(
        s"""SELECT  statmement ... Condition = '$Condition'""".stripMargin)
    } else {
      df_init
    }).repartition(Configuration.appPartitioning)

df.persist()

Seems that none of those actually work as expected. It seems that I cannot
distribute the data across the cluster. Could someone more experienced
provide some hints whot might be wrong?

Thanks










On 14 July 2016 at 19:31, Jakub Stransky <stransky...@gmail.com> wrote:

> HI Talebzadeh,
>
> sorry I forget to answer last part of your question:
>
> At O/S level you should see many CoarseGrainedExecutorBackend through jps
> each corresponding to one executor. Are they doing anything?
>
> There is one worker with one executor bussy and the rest is almost idle:
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9305 spark     20   0 30.489g 5.075g  22256 S  * 0.3 18.5*   0:36.25 java
>
> The only one - bussy one is running all 8cores machine
>
>   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
>  9580 zdata     20   0 29.664g 0.021t   6836 S* 676.7 79.4*  40:08.61 java
>
>
> Thanks
> Jakub
>
> On 14 July 2016 at 19:22, Jakub Stransky <stransky...@gmail.com> wrote:
>
>> HI Talebzadeh,
>>
>> we are using 6 worker machines - running.
>>
>> We are reading the data through sqlContext (data frame) as it is
>> suggested in the documentation over the JdbcRdd
>>
>> prop just specifies name, password, and driver class.
>>
>> Right after this data load we register it as a temp table
>>
>>     val df_init = sqlContext.read
>>       .jdbc(
>>         url = Configuration.dbUrl,
>>         table = Configuration.dbTable,
>>         prop
>>       )
>>
>>     df_init.registerTempTable("df_init")
>>
>> Afterwords we do some data filtering, column selection and filtering some
>> rows with sqlContext.sql ("select statement here")
>>
>> and after this selection we try to repartition the data in order to get
>> them distributed across the cluster and that seems it is not working. And
>> then we persist that filtered and selected dataFrame.
>>
>> And the desired state should be filtered dataframe should be distributed
>> accross the nodes in the cluster.
>>
>> Jakub
>>
>>
>>
>> On 14 July 2016 at 19:03, Mich Talebzadeh <mich.talebza...@gmail.com>
>> wrote:
>>
>>> Hi Jakub,
>>>
>>> Sounds like one executor. Can you point out:
>>>
>>>
>>>    1. The number of slaves/workers you are running
>>>    2. Are you using JDBC to read data in?
>>>    3. Do you register DF as temp table and if so have you cached temp
>>>    table
>>>
>>> Sounds like only one executor is active and the rest are sitting idele.
>>>
>>> At O/S level you should see many CoarseGrainedExecutorBackend through
>>> jps each corresponding to one executor. Are they doing anything?
>>>
>>> HTH
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 14 July 2016 at 17:18, Jakub Stransky <stransky...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> I have a spark  cluster running in a single mode, master + 6 executors.
>>>>
>>>> My application is reading a data from database via DataFrame.read then
>>>> there is a filtering of rows. After that I re-partition data and I wonder
>>>> why on the executors page of the driver UI I see RDD blocks all allocated
>>>> still on single executor machine
>>>>
>>>> [image: Inline images 1]
>>>> As highlighted on the picture above. I did expect that after
>>>> re-partition the data will be shuffled across cluster but that is obviously
>>>> not happening here.
>>>>
>>>> I can understand that database read is happening in non-parallel
>>>> fashion but re-partition  should fix it as far as I understand.
>>>>
>>>> Could someone experienced clarify that?
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>
>>
>> --
>> Jakub Stransky
>> cz.linkedin.com/in/jakubstransky
>>
>>
>
>
> --
> Jakub Stransky
> cz.linkedin.com/in/jakubstransky
>
>


-- 
Jakub Stransky
cz.linkedin.com/in/jakubstransky

Reply via email to