changgyoopark-db commented on code in PR #49584: URL: https://github.com/apache/spark/pull/49584#discussion_r1936816392
########## sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala: ########## @@ -440,46 +443,64 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio * `spark.connect.session.planCache.enabled` is true. * @param rel * The relation to transform. - * @param cachePlan - * Whether to cache the result logical plan. * @param transform * Function to transform the relation into a logical plan. * @return - * The logical plan. + * The logical plan and a flag indicating that the plan cache was hit. */ - private[connect] def usePlanCache(rel: proto.Relation, cachePlan: Boolean)( - transform: proto.Relation => LogicalPlan): LogicalPlan = { - val planCacheEnabled = Option(session) - .forall(_.sessionState.conf.getConf(Connect.CONNECT_SESSION_PLAN_CACHE_ENABLED, true)) - // We only cache plans that have a plan ID. - val hasPlanId = rel.hasCommon && rel.getCommon.hasPlanId - - def getPlanCache(rel: proto.Relation): Option[LogicalPlan] = - planCache match { - case Some(cache) if planCacheEnabled && hasPlanId => - Option(cache.getIfPresent(rel)) match { - case Some(plan) => - logDebug(s"Using cached plan for relation '$rel': $plan") - Some(plan) - case None => None - } - case _ => None - } - def putPlanCache(rel: proto.Relation, plan: LogicalPlan): Unit = - planCache match { - case Some(cache) if planCacheEnabled && hasPlanId => - cache.put(rel, plan) - case _ => + private[connect] def usePlanCache(rel: proto.Relation)( + transform: proto.Relation => LogicalPlan): (LogicalPlan, Boolean) = + planCache match { + case Some(cache) if planCacheEnabled(rel) => + Option(cache.getIfPresent(rel)) match { + case Some(plan) => + logDebug(s"Using cached plan for relation '$rel': $plan") + (plan, true) + case None => (transform(rel), false) + } + case _ => (transform(rel), false) + } + + /** + * Create a data frame from the supplied relation, and update the plan cache. + * + * @param rel + * A proto.Relation to create a data frame. + * @param options + * Options to pass to the data frame. + * @return + * The created data frame. + */ + private[connect] def createDataFrame( + rel: proto.Relation, + planner: SparkConnectPlanner, + options: Option[(QueryPlanningTracker, ShuffleCleanupMode)] = None): DataFrame = { + val (plan, cacheHit) = planner.transformRelationWithCache(rel) + val df = options match { + case Some((tracker, shuffleCleanupMode)) => + Dataset.ofRows(session, plan, tracker, shuffleCleanupMode) + case _ => Dataset.ofRows(session, plan) + } + if (!cacheHit && planCache.isDefined && planCacheEnabled(rel)) { + if (df.queryExecution.isLazyAnalysis) { + val plan = df.queryExecution.logical + logDebug(s"Cache a lazyily analyzed logical plan for '$rel': $plan") + planCache.get.put(rel, plan) + } else { + val plan = df.queryExecution.analyzed Review Comment: Agreed. I'll need to think about the interface. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org