Hi Fred, You said you managed to fix the problem somehow and have attributed some issues with RDD lineage. Few things come to my mind:
1. How did you fix this performance which I gather programmatically 2. In your code have you set spark.conf.set("spark.sql.adaptive.enabled", "true") 3. Depending on the size of your data both source and new data, do you have any indication that your data in global temporary view is totally cached. This should show in the storage tab in UI. If you have data on the disk for then this will affect the performance 4. What is the output of print(rdd.toDebugString()) [image: image.png] I doubt this issue is caused by RDD lineage by adding additional steps not required. HTH Mich view my Linkedin profile <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/> *Disclaimer:* Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction. On Sat, 22 May 2021 at 23:44, Fred Yeadon <f...@qad.com> wrote: > Hi all, > > > Working on a complex Spark 3.0.1 application, I noticed some unexpected > Spark behavior recently that I am hoping someone can explain. The > application is Java with many large classes, but I have tried to describe > the essential logic below. > > During periodic refresh runs, the application extracts data from Cassandra > database tables, filters and combines them into Spark data frames that are > cached and registered as views in the global temporary DB. The data frames > that were previously there are uncached and replaced by the new ones. > Remote clients can then query the views through Spark SQL. > > This refresh process runs in multiple threads that build each new Spark > data frame progressively, reading one Cassandra table, filtering out the > old contents of the view and adding the new Cassandra contents with a union > operation. Each thread un-caches the old data frame and caches the new > one, then runs a count() action to realize the previous transformations. > Below is some pseudo-code for this multi-threaded logic. > > > > ************************************************************************************************* > > // Read Cassandra table using Spark Cassandra Connector > > Dataset<Row> data = > sparkSession.read().format("org.apache.spark.sql.cassandra").options(params).load(); > > > // Combine data into single Spark view > > Dataset<Row> combinedView = null; > > String combinedViewName = "myView"; > > if (<first time through the data frame>) { > > // Start a new combined view from the contents of the source table > > combinedView = data; > > } else { > > // Read the existing combined view to further extend it > > combinedView = sparkSession.table(combinedViewName); > > … > > // Remove stale data with filter > > combinedView = combinedView.filter(<filter condition>); > > … > > // Add new data > > combinedView = combinedView.union(data); > > } > > > // Re-cache modified combined view > > sparkSession.catalog().uncacheTable(combinedViewName); > > combinedView.createOrReplaceGlobalTempView(combinedViewName); > > sparkSession.catalog().cacheTable(combinedViewName); > > combinedView.count(); > > > > ************************************************************************************************* > > The application works, but I recently fixed a bug where Spark SQL queries > were running incrementally slower after each refresh, resulting in steady > performance degradation. After investigation, I found that the <first time > through the data frame> check logic above was not correct, causing a new > combined data frame to build on the lineage of the old data frame RDD that > it is replacing. The Spark physical plan of many queries was becoming > larger and larger because of 'filter' and 'union' transformations being > added to the same data frame. I am not yet very familiar with Spark query > plans, but below are fragments of a physical plan before and after a > refresh that highlight the differences. > > Before refresh > > =========== > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=true > > +- *(8) Project… > > … > > +-*(1) Project … > > +- *(1) Filter … > > +- Scan In-memory table …. > > +- InMemoryRelation … > > +- Exchange RoundRobinPartitioning(2), … > > +- Union > > :- Exchange RoundRobinPartitioning(2), … > > : +- Union > > : :- *(1) Project … > > : : +- *(1) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(2) Project … > > : : +- *(2) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(3) Project … > > : : +- *(3) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(8) Project … > > : : +- *(4) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > > After refresh > > ========== > > > == Physical Plan == > > AdaptiveSparkPlan isFinalPlan=true > > +- *(8) Project… > > … > > +-*(1) Project … > > +- *(1) Filter … > > +- Scan In-memory table …. > > +- InMemoryRelation … > > +- Exchange RoundRobinPartitioning(2), … > > +- Union > > :- Exchange RoundRobinPartitioning(2), … > > : +- Union > > NEW LINE ---> :- Exchange RoundRobinPartitioning(2), … > > NEW LINE ---> : +- Union > > : :- *(1) Project … > > : : +- *(1) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(2) Project … > > : : +- *(2) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(3) Project … > > : : +- *(3) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > : :- *(8) Project … > > : : +- *(4) Filter … > > : +- BatchScan … Cassandra > Scan: <table name> > > ... > > > Once the error was fixed, query times no longer degraded. > > > My question is this: given that the final contents of the data frame are > being cached correctly in memory (I have verified this), why would the > lineage of the data frame's RDD affect query performance at all? I would > think the 'Scan in-memory table' step of the above query plan would always > retrieve the data from cache, making the previous 'filter' and 'union' > transformations within the lineage irrelevant to current performance. Do > Spark SQL queries depend directly on the RDD lineage even when the final > results have been cached? > > > > Thanks in advance for any reply you can give! > >