hvanhovell commented on code in PR #49584:
URL: https://github.com/apache/spark/pull/49584#discussion_r1936120463


##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala:
##########
@@ -440,46 +443,64 @@ case class SessionHolder(userId: String, sessionId: 
String, session: SparkSessio
    * `spark.connect.session.planCache.enabled` is true.
    * @param rel
    *   The relation to transform.
-   * @param cachePlan
-   *   Whether to cache the result logical plan.
    * @param transform
    *   Function to transform the relation into a logical plan.
    * @return
-   *   The logical plan.
+   *   The logical plan and a flag indicating that the plan cache was hit.
    */
-  private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)(
-      transform: proto.Relation => LogicalPlan): LogicalPlan = {
-    val planCacheEnabled = Option(session)
-      
.forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, 
true))
-    // We only cache plans that have a plan ID.
-    val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId
-
-    def getPlanCache(rel: proto.Relation): Option[LogicalPlan] =
-      planCache match {
-        case Some(cache) if planCacheEnabled && hasPlanId =>
-          Option(cache.getIfPresent(rel)) match {
-            case Some(plan) =>
-              logDebug(s"Using cached plan for relation '$rel': $plan")
-              Some(plan)
-            case None => None
-          }
-        case _ => None
-      }
-    def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit =
-      planCache match {
-        case Some(cache) if planCacheEnabled && hasPlanId =>
-          cache.put(rel, plan)
-        case _ =>
+  private[connect] def usePlanCache(rel: proto.Relation)(
+      transform: proto.Relation => LogicalPlan): (LogicalPlan, Boolean) =
+    planCache match {
+      case Some(cache) if planCacheEnabled(rel) =>
+        Option(cache.getIfPresent(rel)) match {
+          case Some(plan) =>
+            logDebug(s"Using cached plan for relation '$rel': $plan")
+            (plan, true)
+          case None => (transform(rel), false)
+        }
+      case _ => (transform(rel), false)
+    }
+
+  /**
+   * Create a data frame from the supplied relation, and update the plan cache.
+   *
+   * @param rel
+   *   A proto.Relation to create a data frame.
+   * @param options
+   *   Options to pass to the data frame.
+   * @return
+   *   The created data frame.
+   */
+  private[connect] def createDataFrame(
+      rel: proto.Relation,
+      planner: SparkConnectPlanner,
+      options: Option[(QueryPlanningTracker, ShuffleCleanupMode)] = None): 
DataFrame = {
+    val (plan, cacheHit) = planner.transformRelationWithCache(rel)
+    val df = options match {
+      case Some((tracker, shuffleCleanupMode)) =>
+        Dataset.ofRows(session, plan, tracker, shuffleCleanupMode)
+      case _ => Dataset.ofRows(session, plan)
+    }
+    if (!cacheHit && planCache.isDefined && planCacheEnabled(rel)) {
+      if (df.queryExecution.isLazyAnalysis) {
+        val plan = df.queryExecution.logical
+        logDebug(s"Cache a lazyily analyzed logical plan for '$rel': $plan")
+        planCache.get.put(rel, plan)
+      } else {
+        val plan = df.queryExecution.analyzed
+        logDebug(s"Cache an analyzed logical plan for '$rel': $plan")
+        planCache.get.put(rel, plan)
       }
+    }
+    df
+  }
 
-    getPlanCache(rel)
-      .getOrElse({
-        val plan = transform(rel)
-        if (cachePlan) {
-          putPlanCache(rel, plan)
-        }
-        plan
-      })
+  // Return true if the plan cache is enabled for the session and the relation.
+  private def planCacheEnabled(rel: proto.Relation): Boolean = {

Review Comment:
   Naming. Use `canCachePlan(..)` instead?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to