aakash-db commented on code in PR #51003:
URL: https://github.com/apache/spark/pull/51003#discussion_r2111055939


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowAnalysis.scala:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
UnresolvedRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.classic.{DataFrame, DataStreamReader, Dataset, 
SparkSession}
+import org.apache.spark.sql.pipelines.{AnalysisWarning, Language}
+import 
org.apache.spark.sql.pipelines.graph.GraphIdentifierManager.{ExternalDatasetIdentifier,
 InternalDatasetIdentifier}
+import org.apache.spark.sql.pipelines.util.{BatchReadOptions, 
InputReadOptions, StreamingReadOptions}
+
+object FlowAnalysis {
+  def createFlowFunctionFromLogicalPlan(plan: LogicalPlan): FlowFunction = {
+    new FlowFunction {
+      override def call(
+          allInputs: Set[TableIdentifier],
+          availableInputs: Seq[Input],
+          confs: Map[String, String],
+          currentCatalog: Option[String],
+          currentDatabase: Option[String]
+      ): FlowFunctionResult = {
+        val ctx = FlowAnalysisContext(
+          allInputs = allInputs,
+          availableInputs = availableInputs,
+          currentCatalog = currentCatalog,
+          currentDatabase = currentDatabase,
+          spark = SparkSession.active
+        )
+        val df = try {
+          confs.foreach { case (k, v) => ctx.setConf(k, v) }
+          Try(FlowAnalysis.analyze(ctx, plan))
+        } finally {
+          ctx.restoreOriginalConf()
+        }
+        FlowFunctionResult(
+          requestedInputs = ctx.requestedInputs.toSet,
+          usedBatchInputs = ctx.batchInputs.toSet,
+          usedStreamingInputs = ctx.streamingInputs.toSet,
+          usedExternalInputs = ctx.externalInputs.toSet,
+          dataFrame = df,
+          sqlConf = confs,
+          analysisWarnings = ctx.analysisWarnings.toList
+        )
+      }
+    }
+  }
+
+  /**
+   * Constructs an analyzed [[DataFrame]] from a [[LogicalPlan]] by resolving 
Pipelines specific
+   * TVFs and datasets that cannot be resolved directly by Catalyst.
+   *
+   * This function shouldn't call any singleton as it will break concurrent 
access to graph
+   * analysis; or any thread local variables as graph analysis and this 
function will use
+   * different threads in python repl.
+   *
+   * @param plan     The [[LogicalPlan]] defining a flow.
+   * @return An analyzed [[DataFrame]].
+   */
+  private def analyze(
+      context: FlowAnalysisContext,
+      plan: LogicalPlan
+  ): DataFrame = {
+    // Users can define CTEs within their CREATE statements. For example,
+    //
+    // CREATE STREAMING TABLE a
+    // WITH b AS (
+    //    SELECT * FROM STREAM upstream
+    // )
+    // SELECT * FROM b
+    //
+    // The relation defined using the WITH keyword is not included in the 
children of the main
+    // plan so the specific analysis we do below will not be applied to those 
relations.
+    // Instead, we call an analyzer rule to inline all of the CTE relations in 
the main plan before
+    // we do analysis. This rule would be called during analysis anyways, but 
we just call it
+    // earlier so we only need to apply analysis to a single logical plan.
+    val planWithInlinedCTEs = CTESubstitution(plan)
+
+    val spark = context.spark
+    // Traverse the user's query plan and recursively resolve nodes that 
reference Pipelines
+    // features that the Spark analyzer is unable to resolve
+    val resolvedPlan = planWithInlinedCTEs transformWithSubqueries {
+        // Streaming read on another dataset
+        // This branch will be hit for the following kinds of queries:
+        // - SELECT ... FROM STREAM(t1)
+        // - SELECT ... FROM STREAM t1
+        case u: UnresolvedRelation if u.isStreaming =>
+          readStreamInput(

Review Comment:
   because this could be a table created within the pipeline, and thus is not 
materialized yet. the analyzer wouldn't be able to resolve the query in this 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to