[ 
https://issues.apache.org/jira/browse/SPARK-51119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun reassigned SPARK-51119:
-------------------------------------

    Assignee: Szehon Ho

> Readers on executors resolving EXISTS_DEFAULT should not call catalogs
> ----------------------------------------------------------------------
>
>                 Key: SPARK-51119
>                 URL: https://issues.apache.org/jira/browse/SPARK-51119
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Szehon Ho
>            Assignee: Szehon Ho
>            Priority: Major
>              Labels: pull-request-available
>
> Summary: Spark executors unnecessary contacts catalogs when resolving 
> EXISTS_DEFAULTS (used for default values for existing data)
> Detailed explanation:  The code path for default values first runs an 
> analysis of the user-provided CURRENT_DEFAULT(to evaluate functions, etc) , 
> and uses the result sql for EXISTS_DEFAULT.  EXISTS_DEFAULT is saved in order 
> to avoid having to rewrite existing data using backfill to fill this value in 
> the files.
> When reading existing files, Spark then attempts to resolve the 
> EXISTS_DEFAULT metadata and use the value for null values it finds in that 
> column.  But this step redundantly runs all the analyzer rules again and 
> finish analysis rules, some of which contact the catalog unnecessarily.
> This may cause exceptions if the executors are not configured properly to 
> reach the catalog, such as:
> {code}
> Caused by: org.apache.spark.SparkException: Failed during instantiating 
> constructor for catalog 'spark_catalog': 
> org.apache.spark.sql.delta.catalog.DeltaCatalog. at 
> org.apache.spark.sql.errors.QueryExecutionErrors$.failedToInstantiateConstructorForCatalogError(QueryExecutionErrors.scala:2400)
>  at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:84) 
> at 
> org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:72)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:94)
>  at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at 
> org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:94)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:93)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:55)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:130)
>  at 
> org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:101)
>  at 
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:172)
>  at 
> org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:169)
>  at 
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:502)
>  at 
> scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at 
> scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) 
> at scala.collection.immutable.List.foldLeft(List.scala:91) at 
> org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:502)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:301)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.analyze(ResolveDefaultColumnsUtil.scala:266)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:427)
>  at scala.Option.map(Option.scala:230) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$getExistenceDefaultValues$1(ResolveDefaultColumnsUtil.scala:425)
>  at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) 
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at 
> scala.collection.TraversableLike.map(TraversableLike.scala:286) at 
> scala.collection.TraversableLike.map$(TraversableLike.scala:279) at 
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.getExistenceDefaultValues(ResolveDefaultColumnsUtil.scala:423)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.$anonfun$existenceDefaultValues$2(ResolveDefaultColumnsUtil.scala:498)
>  at scala.Option.getOrElse(Option.scala:189) at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns$.existenceDefaultValues(ResolveDefaultColumnsUtil.scala:496)
>  at 
> org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.existenceDefaultValues(ResolveDefaultColumnsUtil.scala)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:350)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:373)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.$anonfun$apply$5(ParquetFileFormat.scala:441)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1561)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:428)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.apply(ParquetFileFormat.scala:258)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:639)
>  ... 21 more Caused by: java.lang.IllegalStateException: No active or default 
> Spark session found
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to