cloud-fan commented on code in PR #50875:
URL: https://github.com/apache/spark/pull/50875#discussion_r2095768058


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala:
##########
@@ -1215,4 +1215,129 @@ class SparkSqlAstBuilder extends AstBuilder {
     withIdentClause(ctx.identifierReference(), procIdentifier =>
       DescribeProcedureCommand(UnresolvedIdentifier(procIdentifier)))
   }
+
+  override def visitCreatePipelineInsertIntoFlow(
+      ctx: CreatePipelineInsertIntoFlowContext): LogicalPlan = withOrigin(ctx) 
{
+    val createPipelineFlowHeaderCtx = ctx.createPipelineFlowHeader()
+    val ident = 
UnresolvedIdentifier(visitMultipartIdentifier(createPipelineFlowHeaderCtx.flowName))
+    val commentOpt = 
Option(createPipelineFlowHeaderCtx.commentSpec()).map(visitCommentSpec)
+    val flowOperation = withInsertInto(ctx.insertInto(), 
visitQuery(ctx.query()))
+    CreateFlowCommand(
+      name = ident,
+      flowOperation = flowOperation,
+      comment = commentOpt
+    )
+  }
+
+  override def visitCreatePipelineDataset(
+      ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) {
+    val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader()
+
+    val syntaxTypeErrorStr = if 
(createPipelineDatasetHeaderCtx.materializedView() != null) {
+      "MATERIALIZED VIEW"
+    } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) {
+      "STREAMING TABLE"
+    } else {
+      // Should never be possible based on grammar definition.
+      throw invalidStatement(ctx.getText, ctx)
+    }
+
+    val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null
+    val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
+    val (colDefs, colConstraints) = 
Option(ctx.tableElementList()).map(visitTableElementList)
+      .getOrElse((Nil, Nil))
+
+    if (colConstraints.nonEmpty) {
+      throw operationNotAllowed("Pipeline datasets do not currently support 
column constraints. " +
+        "Please remove and CHECK, UNIQUE, PK, and FK constraints specified on 
the pipeline " +
+        "dataset.", ctx)
+    }
+
+    val (partTransforms, partCols, bucketSpec,
+    properties, options, location, comment, collation, serdeInfoOpt,
+    clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses())
+
+    val partitioning =
+      partitionExpressions(partTransforms, partCols, ctx) ++
+        clusterBySpec.map(_.asTransform)
+
+    // Because the createTableClauses grammar is reused for 
createPipelineDataset but pipeline
+    // datasets don't support bucketing, options, storage location, or Hive 
SerDe, validate they
+    // are not set.
+    if (bucketSpec.isDefined) {
+      throw operationNotAllowed(s"Bucketing is not supported for CREATE 
$syntaxTypeErrorStr " +
+        "statements. Please remove any bucket spec specified in the 
statement.", ctx)
+    }
+    if (options.options.nonEmpty) {
+      throw operationNotAllowed(s"Options are not supported for CREATE 
$syntaxTypeErrorStr " +
+        "statements. Please remove any OPTIONS lists specified in the 
statement.", ctx)
+    }
+    serdeInfoOpt.map(serdeInfo => if (serdeInfo.storedAs.nonEmpty) {
+      throw operationNotAllowed(s"The STORED AS syntax is not supported for 
CREATE " +
+        s"$syntaxTypeErrorStr statements. Consider using the Data Source based 
USING clause "
+        + "instead.", ctx)
+    } else {
+      throw operationNotAllowed(s"Hive SerDe format options are not supported 
for CREATE " +
+        s"$syntaxTypeErrorStr statements.", ctx)
+    })
+    if (location.nonEmpty) {
+      throw operationNotAllowed(s"Specifying location is not supported for 
CREATE " +
+        s"$syntaxTypeErrorStr statements. The storage location for a pipeline 
dataset is " +
+        "managed by the pipeline itself.", ctx)
+    }
+
+    val spec = TableSpec(
+      properties = properties,
+      provider = provider,
+      options = Map.empty,
+      location = location,
+      comment = comment,
+      collation = collation,
+      serde = None,
+      external = false,
+      constraints = Seq.empty
+    )
+
+    withIdentClause(createPipelineDatasetHeaderCtx.identifierReference, ident 
=> {

Review Comment:
   Given the `CreatePipelineDataset#name` field is a `LogicalPlan`, we can just 
do
   ```
   CreateMaterializedViewAsSelect
     name = withIdentClause(...),
     ...)
   ```
   instead of
   ```
   withIdentClause... {
     CreateMaterializedViewAsSelect(...)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to