[ 
https://issues.apache.org/jira/browse/SPARK-51500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rakesh Kumar updated SPARK-51500:
---------------------------------
    Description: 
Spark at Meta is using DatasourcesV2 for warehouse integration and certain 
queries are experiencing performance degradation. This issue arises from AQE's 
failure to reuse stages.

The underlying problem lies in the implementation of the BatchScanExec 
operator. Specifically, the canonicalization and equality/hashcode functions of 
this operator do not reconcile, resulting in the canonicalized BatchScanExec 
becoming semantically equal to its non-canonicalized counterpart. This causes 
the AQE's stageCache to use the non-canonicalized plan for lookup, which fails 
to find existing stages even if a reusable stage exists. 
 

  was:
Spark at Meta is using DatasourcesV2 for warehouse integration and certain 
queries are experiencing performance degradation. This issue arises from AQE's 
failure to reuse stages, leading to suboptimal query execution.
 
The underlying problem lies in the implementation of the BatchScanExec 
operator. Specifically, the canonicalization and equality functions of this 
operator do not reconcile, resulting in the canonicalized BatchScanExec 
becoming semantically equal to its non-canonicalized counterpart. This causes 
the AQE's stageCache to use the non-canonicalized plan for lookup, which fails 
to find existing stages even if a reusable stage exists. 
{code:java}
case class BatchScanExec(
    output: Seq[AttributeReference],
    @transient scan: Scan,
    runtimeFilters: Seq[Expression],
    ordering: Option[Seq[SortOrder]] = None,
    @transient table: Table,
    spjParams: StoragePartitionJoinParams = StoragePartitionJoinParams()
  ) extends DataSourceV2ScanExecBase {
...
override def equals(other: Any): Boolean = other match {
    case other: BatchScanExec =>
      this.batch != null && this.batch == other.batch &&
          this.runtimeFilters == other.runtimeFilters &&
          this.spjParams == other.spjParams
         // Should compare output fields as well.
       case _ =>
      false
  }   

override def doCanonicalize(): BatchScanExec = {
    this.copy(
      output = output.map(QueryPlan.normalizeExpressions(_, output)),
      runtimeFilters = QueryPlan.normalizePredicates(
        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
        output))
  }

...

}{code}
 
 


> AQE does not reuse exchange/stage when the stage has BatchScanExec operator
> ---------------------------------------------------------------------------
>
>                 Key: SPARK-51500
>                 URL: https://issues.apache.org/jira/browse/SPARK-51500
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1, 3.5.1, 3.4.4
>            Reporter: Rakesh Kumar
>            Priority: Major
>
> Spark at Meta is using DatasourcesV2 for warehouse integration and certain 
> queries are experiencing performance degradation. This issue arises from 
> AQE's failure to reuse stages.
> The underlying problem lies in the implementation of the BatchScanExec 
> operator. Specifically, the canonicalization and equality/hashcode functions 
> of this operator do not reconcile, resulting in the canonicalized 
> BatchScanExec becoming semantically equal to its non-canonicalized 
> counterpart. This causes the AQE's stageCache to use the non-canonicalized 
> plan for lookup, which fails to find existing stages even if a reusable stage 
> exists. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to