lifulong commented on code in PR #32816:
URL: https://github.com/apache/spark/pull/32816#discussion_r1701573785


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -97,27 +97,36 @@ case class AdaptiveSparkPlanExec(
     AQEUtils.getRequiredDistribution(inputPlan)
   }
 
+  @transient private val costEvaluator =
+    conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
+      case Some(className) => CostEvaluator.instantiate(className, 
session.sparkContext.getConf)
+      case _ => 
SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
+    }
+
   // A list of physical plan rules to be applied before creation of query 
stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
-  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
-    RemoveRedundantProjects,
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = {
     // For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
     // the final plan, but we do need to respect the user-specified 
repartition. Here we ask
     // `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
     // around this case.
-    EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
-    RemoveRedundantSorts,
-    DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+    val ensureRequirements =
+      EnsureRequirements(requiredDistribution.isDefined, requiredDistribution)
+    Seq(
+      RemoveRedundantProjects,
+      ensureRequirements,
+      RemoveRedundantSorts,
+      DisableUnnecessaryBucketedScan,
+      OptimizeSkewedJoin(ensureRequirements, costEvaluator)

Review Comment:
   hello, i have a question here
   why change OptimizeSkewedJoin rule from queryStageOptimizerRules to 
queryStagePreparationRules,  queryStagePreparationRules will used in the whole 
plan other than current new stage, this will cause ValidateRequirements check 
false while the whole plan contains WindowGroupLimitExec(row_number) or 
HashAggrageteExec(group by) node after skew join
   



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -97,27 +97,36 @@ case class AdaptiveSparkPlanExec(
     AQEUtils.getRequiredDistribution(inputPlan)
   }
 
+  @transient private val costEvaluator =
+    conf.getConf(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS) match {
+      case Some(className) => CostEvaluator.instantiate(className, 
session.sparkContext.getConf)
+      case _ => 
SimpleCostEvaluator(conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN))
+    }
+
   // A list of physical plan rules to be applied before creation of query 
stages. The physical
   // plan should reach a final status of query stages (i.e., no more addition 
or removal of
   // Exchange nodes) after running these rules.
-  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = 
Seq(
-    RemoveRedundantProjects,
+  @transient private val queryStagePreparationRules: Seq[Rule[SparkPlan]] = {
     // For cases like `df.repartition(a, b).select(c)`, there is no 
distribution requirement for
     // the final plan, but we do need to respect the user-specified 
repartition. Here we ask
     // `EnsureRequirements` to not optimize out the user-specified 
repartition-by-col to work
     // around this case.
-    EnsureRequirements(optimizeOutRepartition = 
requiredDistribution.isDefined),
-    RemoveRedundantSorts,
-    DisableUnnecessaryBucketedScan
-  ) ++ context.session.sessionState.queryStagePrepRules
+    val ensureRequirements =
+      EnsureRequirements(requiredDistribution.isDefined, requiredDistribution)
+    Seq(
+      RemoveRedundantProjects,
+      ensureRequirements,
+      RemoveRedundantSorts,
+      DisableUnnecessaryBucketedScan,
+      OptimizeSkewedJoin(ensureRequirements, costEvaluator)

Review Comment:
   ValidateRequirements: ValidateRequirements failed: 
ClusteredDistribution(ArrayBuffer(exp_name#16, client_id#27, 
utm_linkid#19L),false,None) | (UnknownPartitioning(9907) or Unk      
nownPartitioning(9907))
   error log as an example



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to