aakash-db commented on code in PR #51003: URL: https://github.com/apache/spark/pull/51003#discussion_r2111055939
########## sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.pipelines.graph + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, UnresolvedRelation} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.classic.{DataFrame, DataStreamReader, Dataset, SparkSession} +import org.apache.spark.sql.pipelines.{AnalysisWarning, Language} +import org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier, InternalDatasetIdentifier} +import org.apache.spark.sql.pipelines.util.{BatchReadOptions, InputReadOptions, StreamingReadOptions} + +object FlowAnalysis { + def createFlowFunctionFromLogicalPlan(plan: LogicalPlan): FlowFunction = { + new FlowFunction { + override def call( + allInputs: Set[TableIdentifier], + availableInputs: Seq[Input], + confs: Map[String, String], + currentCatalog: Option[String], + currentDatabase: Option[String] + ): FlowFunctionResult = { + val ctx = FlowAnalysisContext( + allInputs = allInputs, + availableInputs = availableInputs, + currentCatalog = currentCatalog, + currentDatabase = currentDatabase, + spark = SparkSession.active + ) + val df = try { + confs.foreach { case (k, v) => ctx.setConf(k, v) } + Try(FlowAnalysis.analyze(ctx, plan)) + } finally { + ctx.restoreOriginalConf() + } + FlowFunctionResult( + requestedInputs = ctx.requestedInputs.toSet, + usedBatchInputs = ctx.batchInputs.toSet, + usedStreamingInputs = ctx.streamingInputs.toSet, + usedExternalInputs = ctx.externalInputs.toSet, + dataFrame = df, + sqlConf = confs, + analysisWarnings = ctx.analysisWarnings.toList + ) + } + } + } + + /** + * Constructs an analyzed [[DataFrame]] from a [[LogicalPlan]] by resolving Pipelines specific + * TVFs and datasets that cannot be resolved directly by Catalyst. + * + * This function shouldn't call any singleton as it will break concurrent access to graph + * analysis; or any thread local variables as graph analysis and this function will use + * different threads in python repl. + * + * @param plan The [[LogicalPlan]] defining a flow. + * @return An analyzed [[DataFrame]]. + */ + private def analyze( + context: FlowAnalysisContext, + plan: LogicalPlan + ): DataFrame = { + // Users can define CTEs within their CREATE statements. For example, + // + // CREATE STREAMING TABLE a + // WITH b AS ( + // SELECT * FROM STREAM upstream + // ) + // SELECT * FROM b + // + // The relation defined using the WITH keyword is not included in the children of the main + // plan so the specific analysis we do below will not be applied to those relations. + // Instead, we call an analyzer rule to inline all of the CTE relations in the main plan before + // we do analysis. This rule would be called during analysis anyways, but we just call it + // earlier so we only need to apply analysis to a single logical plan. + val planWithInlinedCTEs = CTESubstitution(plan) + + val spark = context.spark + // Traverse the user's query plan and recursively resolve nodes that reference Pipelines + // features that the Spark analyzer is unable to resolve + val resolvedPlan = planWithInlinedCTEs transformWithSubqueries { + // Streaming read on another dataset + // This branch will be hit for the following kinds of queries: + // - SELECT ... FROM STREAM(t1) + // - SELECT ... FROM STREAM t1 + case u: UnresolvedRelation if u.isStreaming => + readStreamInput( Review Comment: because this could be a table created within the pipeline, and thus is not materialized yet. the analyzer wouldn't be able to resolve the query in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org