Rakesh Kumar created SPARK-51500:
------------------------------------

             Summary: AQE does not reuse exchange/stage when the stage has 
BatchScanExec operator
                 Key: SPARK-51500
                 URL: https://issues.apache.org/jira/browse/SPARK-51500
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.4.4, 3.5.1, 3.4.1
            Reporter: Rakesh Kumar


Spark at Meta is using DatasourcesV2 for warehouse integration and certain 
queries are experiencing performance degradation. This issue arises from AQE's 
failure to reuse stages, leading to suboptimal query execution.
 
The underlying problem lies in the implementation of the BatchScanExec 
operator. Specifically, the canonicalization and equality functions of this 
operator do not reconcile, resulting in the canonicalized BatchScanExec 
becoming semantically equal to its non-canonicalized counterpart. This causes 
the AQE's stageCache to use the non-canonicalized plan for lookup, which fails 
to find existing stages even if a reusable stage exists. 
{code:java}
case class BatchScanExec(
    output: Seq[AttributeReference],
    @transient scan: Scan,
    runtimeFilters: Seq[Expression],
    ordering: Option[Seq[SortOrder]] = None,
    @transient table: Table,
    spjParams: StoragePartitionJoinParams = StoragePartitionJoinParams()
  ) extends DataSourceV2ScanExecBase {
...
override def equals(other: Any): Boolean = other match {
    case other: BatchScanExec =>
      this.batch != null && this.batch == other.batch &&
          this.runtimeFilters == other.runtimeFilters &&
          this.spjParams == other.spjParams
         // Should compare output fields as well.
       case _ =>
      false
  }   

override def doCanonicalize(): BatchScanExec = {
    this.copy(
      output = output.map(QueryPlan.normalizeExpressions(_, output)),
      runtimeFilters = QueryPlan.normalizePredicates(
        runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
        output))
  }

...

}{code}
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to