I met a trouble in using spark structured streaming. The usercache is
continuously consumed due to the join operation without releasing. How can
I optimize the configuration and/or code to solve this problem?


Spark Cluster in AWS EMR.

1 master node, m4.xlarge, 4 core, 16GB
2 core nodes, m4.xlarge, 4 core, 16GB

yarn configuration:
        'yarn.nodemanager.disk-health-checker.enable':'true',

 
'yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage':'95.0',
        'yarn.nodemanager.localizer.cache.cleanup.interval-ms': '100000',
        'yarn.nodemanager.localizer.cache.target-size-mb': '1024',
        'yarn.nodemanager.pmem-check-enabled': 'false',
        'yarn.nodemanager.vmem-check-enabled': 'false',
       'yarn.log-aggregation.retain-seconds': '12000'

spark-submit
--deploy-mode cluster
--num-executors 3 --executor-memory 8G --executor-cores 2



Code snippet:

//Disable broadcast join
      spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)


      val monitoring_stream = volume_df_filtered.writeStream
                              .trigger(Trigger.ProcessingTime("120
seconds"))
                              .foreachBatch { (batchDF: DataFrame, batchId:
Long) =>
                                if(!batchDF.isEmpty)
                                {
                                    monitoring_df =
monitoring_df.join(batchDF, monitoring_df("id") === batchDF("id"),
"left").select(monitoring_df("id"),
 monitoring_df("total_volume"), batchDF("volume")).na.fill(0) //This join
operation consumes the usercache continusly.
                                   monitoring_df =
monitoring_df.withColumn("total_volume",
monitoring_df("total_volume")+monitoring_df("volume"))

                                    monitoring_df =
monitoring_df.repartition(6)
                                    batchDF.unpersist()
                                    spark.catalog.clearCache()
}

Reply via email to