I met a trouble in using spark structured streaming. The usercache is continuously consumed due to the join operation without releasing. How can I optimize the configuration and/or code to solve this problem?
Spark Cluster in AWS EMR. 1 master node, m4.xlarge, 4 core, 16GB 2 core nodes, m4.xlarge, 4 core, 16GB yarn configuration: 'yarn.nodemanager.disk-health-checker.enable':'true', 'yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage':'95.0', 'yarn.nodemanager.localizer.cache.cleanup.interval-ms': '100000', 'yarn.nodemanager.localizer.cache.target-size-mb': '1024', 'yarn.nodemanager.pmem-check-enabled': 'false', 'yarn.nodemanager.vmem-check-enabled': 'false', 'yarn.log-aggregation.retain-seconds': '12000' spark-submit --deploy-mode cluster --num-executors 3 --executor-memory 8G --executor-cores 2 Code snippet: //Disable broadcast join spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) val monitoring_stream = volume_df_filtered.writeStream .trigger(Trigger.ProcessingTime("120 seconds")) .foreachBatch { (batchDF: DataFrame, batchId: Long) => if(!batchDF.isEmpty) { monitoring_df = monitoring_df.join(batchDF, monitoring_df("id") === batchDF("id"), "left").select(monitoring_df("id"), monitoring_df("total_volume"), batchDF("volume")).na.fill(0) //This join operation consumes the usercache continusly. monitoring_df = monitoring_df.withColumn("total_volume", monitoring_df("total_volume")+monitoring_df("volume")) monitoring_df = monitoring_df.repartition(6) batchDF.unpersist() spark.catalog.clearCache() }