It depends a  bit on the data as well, but have you investigated in SparkUI 
which executor/task becomes slowly?

Could it be also the database from which you load data?

> Am 18.07.2020 um 17:00 schrieb Yong Yuan <yyuankm1...@gmail.com>:
> 
> 
> The spark job has the correct functions and logic. However, after several 
> hours running, it becomes slower and slower. Are there some pitfalls in the 
> below code? Thanks!
> 
> 
> val query = "(select * from meta_table) as meta_data"    
> val meta_schema = new StructType()         
>        .add("config_id", BooleanType)         
>        .add("threshold", LongType)         
> var meta_df = spark.read.jdbc(url, query, connectionProperties)         
> var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")  
> 
> //rules_imsi_df: joining of kafka ingestion with the meta_df_explode 
> 
> //rules_monitoring_df: static dataframe for monitoring purpose   
> 
> val rules_monitoring_stream =        
>             rules_imsi_df.writeStream           
>                         .outputMode("append")  
>                           .format("memory")
>                         .trigger(Trigger.ProcessingTime("120  seconds"))
>                          .foreachBatch {                  
>                               (batchDF: DataFrame, batchId: Long) =>
>                                     if(!batchDF.isEmpty)                      
>                                {    
> 
> printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, 
> batchDF.count())                                                             
> batchDF.show()                                                            
>  batchDF.persist()                                                            
>     var batchDF_group = 
> batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", 
> "total_volume_id")                  
> rules_monitoring_df = rules_monitoring_df.join(batchDF_group, 
> rules_monitoring_df("id") === batchDF_group("id"), 
> "left").select(rules_monitoring_df("id"), 
> batchDF_group("total_volume_id")).na.fill(0)                         
> rules_monitoring_df = rules_monitoring_df.withColumn("volume", 
> rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id"))         
>                                                       batchDF.unpersist()     
>                                                                      }        
>                                   }.start()    
> 
> 
>       while(rules_monitoring_stream.isActive)    {                  
> Thread.sleep(240000)                      
> ... //Periodically load meta data from database          
> meta_df = spark.read.jdbc(url, query, connectionProperties)              
> meta_df_explode=meta_df.select(col("id"), from_json(col("config"), 
> meta_schema).as("config")).select("config_id", "thresold", "config.*")        
>  
> } 
> 
> 
> 
> 
> In addition to the code, the yarn-sites.xml is configured as below. 
> 
> yarn.nodemanager.pmem-check-enabled, false
> yarn.nodemanager.localizer.cache.target-size-mb, 5120
> yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000
> yarn.nodemanager.vmem-check-enabled, false
> yarn.nodemanager.disk-health-checker.enable,true
> yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0
> yarn.log-aggregation.retain-seconds,36000
> 
> 
> 
> The spark-submit command is as below. 
> 
> spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G 
> --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab 
> --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf 
> -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf" --conf 
> "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf
>  -Djava.security.krb5.conf=./krb5.conf"  --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 
> sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
> 
> 
> I am running the job in AWS EMR with 2 m4.xlarge. 
> 
> Thanks!

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to