szehon-ho opened a new pull request, #49840:
URL: https://github.com/apache/spark/pull/49840

   
   
   ### What changes were proposed in this pull request?
   Simplify the resolution of EXISTS_DEFAULT on 
ResolveDefaultColumns::getExistenceDefaultValues(), which are called from file 
readers
   
   
   ### Why are the changes needed?
   Spark executors unnecessary contacts catalogs when resolving EXISTS_DEFAULTS 
(used for default values for existing data)
   
   Detailed explanation: The code path for default values first runs an 
analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , and 
uses the result sql for EXISTS_DEFAULT. EXISTS_DEFAULT is saved in order to 
avoid having to rewrite existing data using backfill to fill this value in the 
files.
   
   When reading existing files, Spark then attempts to resolve the 
EXISTS_DEFAULT metadata and use the value for null values it finds in that 
column. But this step redundantly runs all the analyzer rules again and finish 
analysis rules, some of which contact the catalog unnecessarily.
   
   This may cause exceptions if the executors are not configured properly to 
reach the catalog, such as:
   
   ```
   Caused by: org.apache.spark.SparkException: Failed during instantiating 
constructor for catalog 'spark_catalog': 
org.apache.spark.sql.delta.catalog.DeltaCatalog. at 
org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
 at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) at 
org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
 at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at 
org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
 at or
 
g.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
 at 
org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
 at 
org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
 at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
 at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) 
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
at scala.collection.immutable.List.foldLeft(List.scala:91) at 
org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsU
 til.scala:266) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
 at scala.Option.map(Option.scala:230) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) 
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) 
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
scala.collection.TraversableLike.map(TraversableLike.scala:286) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:279) at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
 at org.apache.spark.sql.catalyst.util.Reso
 
lveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
 at scala.Option.getOrElse(Option.scala:189) at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
 at 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
 at 
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
 at 
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
 at org.apache.spark.sql.execu
 
tion.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
 ... 21 more Caused by: java.lang.IllegalStateException: No active or default 
Spark session found
   ```
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Added a test in StructTypeSuite.  I had to expose for testing some members 
in ResolveDefaultColumns.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to