JiaqiWang18 commented on code in PR #51644:
URL: https://github.com/apache/spark/pull/51644#discussion_r2380329356


##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/SystemMetadataSuite.scala:
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import java.nio.file.Files
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.execution.streaming.{MemoryStream, 
StreamingQueryWrapper}
+import org.apache.spark.sql.pipelines.utils.{ExecutionTest, 
TestGraphRegistrationContext}
+import org.apache.spark.sql.test.SharedSparkSession
+
+class SystemMetadataSuite
+    extends ExecutionTest
+    with SystemMetadataTestHelpers
+    with SharedSparkSession {
+  test("flow checkpoint for ST wrote to the expected location") {
+    val session = spark
+    import session.implicits._
+
+    // create a pipeline with only a single ST
+    val graph = new TestGraphRegistrationContext(spark) {
+      val mem: MemoryStream[Int] = MemoryStream[Int]
+      mem.addData(1, 2, 3)
+      registerView("a", query = dfFlowFunc(mem.toDF()))
+      registerTable("st")
+      registerFlow("st", "st", query = readStreamFlowFunc("a"))
+    }.toDataflowGraph
+
+    val testStorageRoot = 
Files.createTempDirectory("TestUpdateContext").normalize.toString
+    val updateContext1 = TestPipelineUpdateContext(
+      unresolvedGraph = graph,
+      spark = spark,
+      storageRootOpt = Option(testStorageRoot)
+    )
+
+    // start an update in continuous mode, checkpoints are only created to 
streaming query
+    updateContext1.pipelineExecution.startPipeline()
+    updateContext1.pipelineExecution.awaitCompletion()
+
+    val graphExecution1 = updateContext1.pipelineExecution.graphExecution.get
+    val executionGraph1 = graphExecution1.graphForExecution
+
+    val stIdentifier = fullyQualifiedIdentifier("st")
+
+    val expectedStorageSuffix = "st"
+
+    // assert that the checkpoint dir for the ST is created as expected
+    assertFlowCheckpointDirExists(
+      tableOrSinkElement = executionGraph1.table(stIdentifier),
+      flowElement = executionGraph1.flow(stIdentifier),
+      expectedStorageName = expectedStorageSuffix,
+      // the default checkpoint version is 0
+      expectedCheckpointVersion = 0,
+      graphExecution = graphExecution1,
+      updateContext = updateContext1
+    )
+
+    // start another update in full refresh, expected a new checkpoint dir to 
be created
+    // with version number incremented to 1
+    val updateContext2 = TestPipelineUpdateContext(
+      unresolvedGraph = graph,
+      spark = spark,
+      storageRootOpt = Option(testStorageRoot),
+      fullRefreshTables = AllTables
+    )
+
+    updateContext2.pipelineExecution.startPipeline()
+    updateContext2.pipelineExecution.awaitCompletion()
+    val graphExecution2 = updateContext2.pipelineExecution.graphExecution.get
+    val executionGraph2 = graphExecution2.graphForExecution
+
+    // due to full refresh, assert that new checkpoint dir is created with 
version number
+    // incremented to 1
+    assertFlowCheckpointDirExists(
+      tableOrSinkElement = executionGraph2.table(stIdentifier),
+      flowElement = executionGraph1.flow(stIdentifier),
+      expectedStorageName = expectedStorageSuffix,
+      // new checkpoint directory is created
+      expectedCheckpointVersion = 1,
+      graphExecution = graphExecution2,
+      updateContext2
+    )
+  }
+
+  test(
+    "flow checkpoint for ST (with flow name different from table name) wrote 
to the expected " +
+    "location"
+  ) {
+    val session = spark
+    import session.implicits._
+
+    val graph = new TestGraphRegistrationContext(spark) {
+      val mem: MemoryStream[Int] = MemoryStream[Int]
+      mem.addData(1, 2, 3)
+      registerView("a", query = dfFlowFunc(mem.toDF()))
+      registerTable("st")
+      registerFlow("st", "st_flow", query = readStreamFlowFunc("a"))
+    }.toDataflowGraph
+
+    val testStorageRoot = 
Files.createTempDirectory("TestUpdateContext").normalize.toString
+    val updateContext1 = TestPipelineUpdateContext(
+      unresolvedGraph = graph,
+      spark = spark,
+      storageRootOpt = Option(testStorageRoot)
+    )
+
+    // start an update in continuous mode, checkpoints are only created to 
streaming query
+    updateContext1.pipelineExecution.startPipeline()
+    updateContext1.pipelineExecution.awaitCompletion()
+
+    val graphExecution1 = updateContext1.pipelineExecution.graphExecution.get
+    val executionGraph1 = graphExecution1.graphForExecution
+
+    val stIdentifier = fullyQualifiedIdentifier("st")
+
+    val expectedStorageSuffix = "st_flow"
+
+    // assert that the checkpoint dir for the ST is created as expected
+    assertFlowCheckpointDirExists(
+      tableOrSinkElement = executionGraph1.table(stIdentifier),
+      flowElement = executionGraph1.flow(stIdentifier),
+      expectedStorageName = expectedStorageSuffix,
+      // the default checkpoint version is 0
+      expectedCheckpointVersion = 0,
+      graphExecution = graphExecution1,
+      updateContext = updateContext1
+    )
+
+    // start another update in full refresh, expected a new checkpoint dir to 
be created
+    // with version number incremented to 1

Review Comment:
   also for my understanding, why do we expect the version number to be `1` 
now? How does the new checkpoint versioning logic interact with refresh options?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.classic.SparkSession
+
+sealed trait SystemMetadata {}
+
+/**
+ * Represents the system metadata associated with a [[Flow]].
+ */
+case class FlowSystemMetadata(
+    context: PipelineUpdateContext,
+    flow: Flow,
+    graph: DataflowGraph
+) extends SystemMetadata {
+
+  /**
+   * Returns the checkpoint root directory for a given flow.
+   * @return the checkpoint root directory for `flow`
+   */
+  private def flowCheckpointsDirOpt(): Option[Path] = {

Review Comment:
   for my understanding, are we changing the checkpoint directory location, and 
why do we need to explicitly manage checkpoint location ourselves? Is it for 
implementing our custom versioned checkpoint?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/SystemMetadata.scala:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.graph
+
+import scala.util.Try
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.classic.SparkSession
+
+sealed trait SystemMetadata {}
+
+/**
+ * Represents the system metadata associated with a [[Flow]].
+ */
+case class FlowSystemMetadata(
+    context: PipelineUpdateContext,
+    flow: Flow,
+    graph: DataflowGraph
+) extends SystemMetadata {
+
+  /**
+   * Returns the checkpoint root directory for a given flow.
+   * @return the checkpoint root directory for `flow`
+   */
+  private def flowCheckpointsDirOpt(): Option[Path] = {
+    Option(if (graph.sink.contains(flow.destinationIdentifier)) {
+      val storageRoot = context.storageRootOpt.getOrElse(
+        // TODO: better error
+        throw new IllegalArgumentException("Storage root must be defined for 
flow checkpoints")
+      )
+      val storageFlowName = flow.identifier.table
+      new Path(new Path(storageRoot, "checkpoints"), storageFlowName)
+    } else if (graph.table.contains(flow.destinationIdentifier)) {
+      val storageFlowName = flow.identifier.table
+      new Path(
+        new Path(graph.table(flow.destinationIdentifier).path, "checkpoints"),
+        storageFlowName
+      )
+    } else {
+      // TODO: raise an error
+      throw new IllegalArgumentException(
+        s"Flow ${flow.identifier} does not have a valid destination for 
checkpoints."
+      )
+    })
+  }
+
+  /** Returns the location for the most recent checkpoint of a given flow. */
+  def latestCheckpointLocation: String = {
+    val checkpointsDir = flowCheckpointsDirOpt().get
+    SystemMetadata.getLatestCheckpointDir(checkpointsDir)
+  }
+
+  /**
+   * Same as [[latestCheckpointLocation()]] but returns [[None]] if the flow 
checkpoints directory
+   * does not exist.
+   */
+  def latestCheckpointLocationOpt(): Option[String] = {
+    flowCheckpointsDirOpt().map { flowCheckpointsDir =>
+      SystemMetadata.getLatestCheckpointDir(flowCheckpointsDir)
+    }
+  }
+}
+
+object SystemMetadata {
+  private def spark = SparkSession.getActiveSession.get
+
+  /**
+   * Finds the largest checkpoint version subdirectory path within a 
checkpoint directory, or
+   * creates and returns a version 0 subdirectory path if no versions exist.
+   * @param rootDir The root/parent directory where all the numbered 
checkpoint subdirectories are
+   *                stored
+   * @param createNewCheckpointDir If true, a new latest numbered checkpoint 
directory should be
+   *                               created and returned
+   * @return The string URI path to the latest checkpoint directory
+   */
+  def getLatestCheckpointDir(
+      rootDir: Path,
+      createNewCheckpointDir: Boolean = false
+  ): String = {

Review Comment:
   for my own understanding, why do we need versioned checkpoint directory?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphRegistrationContext.scala:
##########
@@ -45,17 +49,20 @@ class GraphRegistrationContext(
     views += viewDef
   }
 
+  def registerSink(sinkDef: Sink): Unit = {
+    sinks += sinkDef
+  }
+
   def registerFlow(flowDef: UnresolvedFlow): Unit = {
     flows += flowDef.copy(sqlConf = defaultSqlConf ++ flowDef.sqlConf)
   }
 
   def toDataflowGraph: DataflowGraph = {
-    if (tables.isEmpty && views.collect { case v: PersistedView =>
-        v
-      }.isEmpty) {
-      throw new AnalysisException(
-        errorClass = "RUN_EMPTY_PIPELINE",
-        messageParameters = Map.empty)
+    if (tables.isEmpty && views.collect {
+        case v: PersistedView =>
+          v
+      }.isEmpty && sinks.isEmpty) {

Review Comment:
   so if a pipeline only has sinks, does it count as a valid pipeline?



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/PipelineExecution.scala:
##########
@@ -46,6 +46,10 @@ class PipelineExecution(context: PipelineUpdateContext) {
   def startPipeline(): Unit = synchronized {
     // Initialize the graph.
     val resolvedGraph = resolveGraph()
+    if (context.fullRefreshTables.nonEmpty) {
+      State.reset(resolvedGraph, context)

Review Comment:
   does this need to happen after `DatasetManager.materializeDatasets`, seems 
like we only know `normalizedPath` after `materializeTable` 
[here](https://github.com/sryza/spark/blob/ac211435c7a4ca1a34a52e61c0647355ebd29476/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala#L210)



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/FlowExecution.scala:
##########
@@ -261,3 +271,30 @@ class BatchTableWrite(
       }
     }
 }
+
+/** A `StreamingFlowExecution` that writes a streaming `DataFrame` to a 
`Table`. */
+class SinkWrite(
+    val identifier: TableIdentifier,
+    val flow: ResolvedFlow,
+    val graph: DataflowGraph,
+    val updateContext: PipelineUpdateContext,
+    val checkpointPath: String,
+    val trigger: Trigger,
+    val destination: Sink,
+    val sqlConf: Map[String, String]
+) extends StreamingFlowExecution {
+
+  override def getOrigin: QueryOrigin = flow.origin
+
+  def startStream(): StreamingQuery = {
+    val data = graph.reanalyzeFlow(flow).df

Review Comment:
   why do we need to reanalyze the flow before execution? Seems like we are 
doing this for table writes as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to