It depends a bit on the data as well, but have you investigated in SparkUI which executor/task becomes slowly?
Could it be also the database from which you load data? > Am 18.07.2020 um 17:00 schrieb Yong Yuan <yyuankm1...@gmail.com>: > > > The spark job has the correct functions and logic. However, after several > hours running, it becomes slower and slower. Are there some pitfalls in the > below code? Thanks! > > > val query = "(select * from meta_table) as meta_data" > val meta_schema = new StructType() > .add("config_id", BooleanType) > .add("threshold", LongType) > var meta_df = spark.read.jdbc(url, query, connectionProperties) > var meta_df_explode=meta_df.select(col("id"), from_json(col("config"), > meta_schema).as("config")).select("config_id", "thresold", "config.*") > > //rules_imsi_df: joining of kafka ingestion with the meta_df_explode > > //rules_monitoring_df: static dataframe for monitoring purpose > > val rules_monitoring_stream = > rules_imsi_df.writeStream > .outputMode("append") > .format("memory") > .trigger(Trigger.ProcessingTime("120 seconds")) > .foreachBatch { > (batchDF: DataFrame, batchId: Long) => > if(!batchDF.isEmpty) > { > > printf("At %d, the microbatch has %d records \n", Instant.now.getEpochSecond, > batchDF.count()) > batchDF.show() > batchDF.persist() > var batchDF_group = > batchDF.groupBy("id").sum("volume").withColumnRenamed("sum(volume)", > "total_volume_id") > rules_monitoring_df = rules_monitoring_df.join(batchDF_group, > rules_monitoring_df("id") === batchDF_group("id"), > "left").select(rules_monitoring_df("id"), > batchDF_group("total_volume_id")).na.fill(0) > rules_monitoring_df = rules_monitoring_df.withColumn("volume", > rules_monitoring_df("volume")+rules_monitoring_df("total_volume_id")) > batchDF.unpersist() > } > }.start() > > > while(rules_monitoring_stream.isActive) { > Thread.sleep(240000) > ... //Periodically load meta data from database > meta_df = spark.read.jdbc(url, query, connectionProperties) > meta_df_explode=meta_df.select(col("id"), from_json(col("config"), > meta_schema).as("config")).select("config_id", "thresold", "config.*") > > } > > > > > In addition to the code, the yarn-sites.xml is configured as below. > > yarn.nodemanager.pmem-check-enabled, false > yarn.nodemanager.localizer.cache.target-size-mb, 5120 > yarn.nodemanager.localizer.cache.cleanup.interval-ms, 400000 > yarn.nodemanager.vmem-check-enabled, false > yarn.nodemanager.disk-health-checker.enable,true > yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage,95.0 > yarn.log-aggregation.retain-seconds,36000 > > > > The spark-submit command is as below. > > spark-submit --driver-memory 5G --num-executors 3 --executor-memory 6G > --executor-cores 2 --files client_jaas.conf,cacerts,krb5.conf,service.keytab > --driver-java-options "-Djava.security.auth.login.config=./client_jaas.conf > -Djava.security.krb5.conf=./krb5.conf" --conf > "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf > -Djava.security.krb5.conf=./krb5.conf" --conf > "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=./client_jaas.conf > -Djava.security.krb5.conf=./krb5.conf" --packages > org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 > sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar > > > I am running the job in AWS EMR with 2 m4.xlarge. > > Thanks! --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org