cloud-fan commented on code in PR #50875: URL: https://github.com/apache/spark/pull/50875#discussion_r2095768058
########## sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala: ########## @@ -1215,4 +1215,129 @@ class SparkSqlAstBuilder extends AstBuilder { withIdentClause(ctx.identifierReference(), procIdentifier => DescribeProcedureCommand(UnresolvedIdentifier(procIdentifier))) } + + override def visitCreatePipelineInsertIntoFlow( + ctx: CreatePipelineInsertIntoFlowContext): LogicalPlan = withOrigin(ctx) { + val createPipelineFlowHeaderCtx = ctx.createPipelineFlowHeader() + val ident = UnresolvedIdentifier(visitMultipartIdentifier(createPipelineFlowHeaderCtx.flowName)) + val commentOpt = Option(createPipelineFlowHeaderCtx.commentSpec()).map(visitCommentSpec) + val flowOperation = withInsertInto(ctx.insertInto(), visitQuery(ctx.query())) + CreateFlowCommand( + name = ident, + flowOperation = flowOperation, + comment = commentOpt + ) + } + + override def visitCreatePipelineDataset( + ctx: CreatePipelineDatasetContext): LogicalPlan = withOrigin(ctx) { + val createPipelineDatasetHeaderCtx = ctx.createPipelineDatasetHeader() + + val syntaxTypeErrorStr = if (createPipelineDatasetHeaderCtx.materializedView() != null) { + "MATERIALIZED VIEW" + } else if (createPipelineDatasetHeaderCtx.streamingTable() != null) { + "STREAMING TABLE" + } else { + // Should never be possible based on grammar definition. + throw invalidStatement(ctx.getText, ctx) + } + + val ifNotExists = createPipelineDatasetHeaderCtx.EXISTS() != null + val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText) + val (colDefs, colConstraints) = Option(ctx.tableElementList()).map(visitTableElementList) + .getOrElse((Nil, Nil)) + + if (colConstraints.nonEmpty) { + throw operationNotAllowed("Pipeline datasets do not currently support column constraints. " + + "Please remove and CHECK, UNIQUE, PK, and FK constraints specified on the pipeline " + + "dataset.", ctx) + } + + val (partTransforms, partCols, bucketSpec, + properties, options, location, comment, collation, serdeInfoOpt, + clusterBySpec) = visitCreateTableClauses(ctx.createTableClauses()) + + val partitioning = + partitionExpressions(partTransforms, partCols, ctx) ++ + clusterBySpec.map(_.asTransform) + + // Because the createTableClauses grammar is reused for createPipelineDataset but pipeline + // datasets don't support bucketing, options, storage location, or Hive SerDe, validate they + // are not set. + if (bucketSpec.isDefined) { + throw operationNotAllowed(s"Bucketing is not supported for CREATE $syntaxTypeErrorStr " + + "statements. Please remove any bucket spec specified in the statement.", ctx) + } + if (options.options.nonEmpty) { + throw operationNotAllowed(s"Options are not supported for CREATE $syntaxTypeErrorStr " + + "statements. Please remove any OPTIONS lists specified in the statement.", ctx) + } + serdeInfoOpt.map(serdeInfo => if (serdeInfo.storedAs.nonEmpty) { + throw operationNotAllowed(s"The STORED AS syntax is not supported for CREATE " + + s"$syntaxTypeErrorStr statements. Consider using the Data Source based USING clause " + + "instead.", ctx) + } else { + throw operationNotAllowed(s"Hive SerDe format options are not supported for CREATE " + + s"$syntaxTypeErrorStr statements.", ctx) + }) + if (location.nonEmpty) { + throw operationNotAllowed(s"Specifying location is not supported for CREATE " + + s"$syntaxTypeErrorStr statements. The storage location for a pipeline dataset is " + + "managed by the pipeline itself.", ctx) + } + + val spec = TableSpec( + properties = properties, + provider = provider, + options = Map.empty, + location = location, + comment = comment, + collation = collation, + serde = None, + external = false, + constraints = Seq.empty + ) + + withIdentClause(createPipelineDatasetHeaderCtx.identifierReference, ident => { Review Comment: Given the `CreatePipelineDataset#name` field is a `LogicalPlan`, we can just do ``` CreateMaterializedViewAsSelect name = withIdentClause(...), ...) ``` instead of ``` withIdentClause... { CreateMaterializedViewAsSelect(...) } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org