Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1477#discussion_r48751597 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/Table.scala --- @@ -1,271 +1,291 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer} -import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} -import org.apache.flink.api.table.parser.ExpressionParser -import org.apache.flink.api.table.plan._ - -/** - * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs - * have [[org.apache.flink.api.scala.DataSet]] and - * [[org.apache.flink.streaming.api.scala.DataStream]]. - * - * Use the methods of [[Table]] to transform data. Use - * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet - * or DataStream. - * - * When using Scala a [[Table]] can also be converted using implicit conversions. - * - * Example: - * - * {{{ - * val table = set.toTable('a, 'b) - * ... - * val table2 = ... - * val set = table2.toDataSet[MyType] - * }}} - * - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression - * syntax. - */ -case class Table(private[flink] val operation: PlanNode) { - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) - * }}} - */ - def select(fields: Expression*): Table = { - val analyzer = new SelectionAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - val fieldNames = analyzedFields map(_.name) - if (fieldNames.toSet.size != fieldNames.size) { - throw new ExpressionException(s"Resulting fields names are not unique in expression" + - s""" "${fields.mkString(", ")}".""") - } - this.copy(operation = Select(operation, analyzedFields)) - } - - /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") - * }}} - */ - def select(fields: String): Table = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - select(fieldExprs: _*) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as('a, 'b) - * }}} - */ - def as(fields: Expression*): Table = { - fields forall { - f => f.isInstanceOf[UnresolvedFieldReference] - } match { - case true => - case false => throw new ExpressionException("Only field expression allowed in as().") - } - this.copy(operation = As(operation, fields.toArray map { _.name })) - } - - /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as("a, b") - * }}} - */ - def as(fields: String): Table = { - val fieldExprs = ExpressionParser.parseExpressionList(fields) - as(fieldExprs: _*) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter('name === "Fred") - * }}} - */ - def filter(predicate: Expression): Table = { - val analyzer = new PredicateAnalyzer(operation.outputFields) - val analyzedPredicate = analyzer.analyze(predicate) - this.copy(operation = Filter(operation, analyzedPredicate)) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter("name = 'Fred'") - * }}} - */ - def filter(predicate: String): Table = { - val predicateExpr = ExpressionParser.parseExpression(predicate) - filter(predicateExpr) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where('name === "Fred") - * }}} - */ - def where(predicate: Expression): Table = { - filter(predicate) - } - - /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where("name = 'Fred'") - * }}} - */ - def where(predicate: String): Table = { - filter(predicate) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy('key).select('key, 'value.avg) - * }}} - */ - def groupBy(fields: Expression*): Table = { - val analyzer = new GroupByAnalyzer(operation.outputFields) - val analyzedFields = fields.map(analyzer.analyze) - - val illegalKeys = analyzedFields filter { - case fe: ResolvedFieldReference => false // OK - case e => true - } - - if (illegalKeys.nonEmpty) { - throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) - } - - this.copy(operation = GroupBy(operation, analyzedFields)) - } - - /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy("key").select("key, value.avg") - * }}} - */ - def groupBy(fields: String): Table = { - val fieldsExpr = ExpressionParser.parseExpressionList(fields) - groupBy(fieldsExpr: _*) - } - - /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. You can use - * where and select clauses after a join to further specify the behaviour of the join. - * - * Example: - * - * {{{ - * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) - * }}} - */ - def join(right: Table): Table = { - val leftInputNames = operation.outputFields.map(_._1).toSet - val rightInputNames = right.operation.outputFields.map(_._1).toSet - if (leftInputNames.intersect(rightInputNames).nonEmpty) { - throw new ExpressionException( - "Overlapping fields names on join input, result would be ambiguous: " + - operation.outputFields.mkString(", ") + - " and " + - right.operation.outputFields.mkString(", ") ) - } - this.copy(operation = Join(operation, right.operation)) - } - - /** - * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations - * must fully overlap. - * - * Example: - * - * {{{ - * left.unionAll(right) - * }}} - */ - def unionAll(right: Table): Table = { - val leftInputFields = operation.outputFields - val rightInputFields = right.operation.outputFields - if (!leftInputFields.equals(rightInputFields)) { - throw new ExpressionException( - "The fields names of join inputs should be fully overlapped, left inputs fields:" + - operation.outputFields.mkString(", ") + - " and right inputs fields" + - right.operation.outputFields.mkString(", ") - ) - } - this.copy(operation = UnionAll(operation, right.operation)) - } - - override def toString: String = s"Expression($operation)" -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.java.io.DiscardingOutputFormat +import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer} +import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.plan._ +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ + +/** + * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs + * have [[org.apache.flink.api.scala.DataSet]] and + * [[org.apache.flink.streaming.api.scala.DataStream]]. + * + * Use the methods of [[Table]] to transform data. Use + * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet + * or DataStream. + * + * When using Scala a [[Table]] can also be converted using implicit conversions. + * + * Example: + * + * {{{ + * val table = set.toTable('a, 'b) + * ... + * val table2 = ... + * val set = table2.toDataSet[MyType] + * }}} + * + * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments + * in a Scala DSL or as an expression String. Please refer to the documentation for the expression + * syntax. + */ +case class Table(private[flink] val operation: PlanNode) { + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) + * }}} + */ + def select(fields: Expression*): Table = { + val analyzer = new SelectionAnalyzer(operation.outputFields) + val analyzedFields = fields.map(analyzer.analyze) + val fieldNames = analyzedFields map(_.name) + if (fieldNames.toSet.size != fieldNames.size) { + throw new ExpressionException(s"Resulting fields names are not unique in expression" + + s""" "${fields.mkString(", ")}".""") + } + this.copy(operation = Select(operation, analyzedFields)) + } + + /** + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") + * }}} + */ + def select(fields: String): Table = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + select(fieldExprs: _*) + } + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as('a, 'b) + * }}} + */ + def as(fields: Expression*): Table = { + fields forall { + f => f.isInstanceOf[UnresolvedFieldReference] + } match { + case true => + case false => throw new ExpressionException("Only field expression allowed in as().") + } + this.copy(operation = As(operation, fields.toArray map { _.name })) + } + + /** + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as("a, b") + * }}} + */ + def as(fields: String): Table = { + val fieldExprs = ExpressionParser.parseExpressionList(fields) + as(fieldExprs: _*) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter('name === "Fred") + * }}} + */ + def filter(predicate: Expression): Table = { + val analyzer = new PredicateAnalyzer(operation.outputFields) + val analyzedPredicate = analyzer.analyze(predicate) + this.copy(operation = Filter(operation, analyzedPredicate)) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter("name = 'Fred'") + * }}} + */ + def filter(predicate: String): Table = { + val predicateExpr = ExpressionParser.parseExpression(predicate) + filter(predicateExpr) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.where('name === "Fred") + * }}} + */ + def where(predicate: Expression): Table = { + filter(predicate) + } + + /** + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.where("name = 'Fred'") + * }}} + */ + def where(predicate: String): Table = { + filter(predicate) + } + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy('key).select('key, 'value.avg) + * }}} + */ + def groupBy(fields: Expression*): Table = { + val analyzer = new GroupByAnalyzer(operation.outputFields) + val analyzedFields = fields.map(analyzer.analyze) + + val illegalKeys = analyzedFields filter { + case fe: ResolvedFieldReference => false // OK + case e => true + } + + if (illegalKeys.nonEmpty) { + throw new ExpressionException("Illegal key expressions: " + illegalKeys.mkString(", ")) + } + + this.copy(operation = GroupBy(operation, analyzedFields)) + } + + /** + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy("key").select("key, value.avg") + * }}} + */ + def groupBy(fields: String): Table = { + val fieldsExpr = ExpressionParser.parseExpressionList(fields) + groupBy(fieldsExpr: _*) + } + + /** + * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use [[as]] to rename fields if necessary. You can use + * where and select clauses after a join to further specify the behaviour of the join. + * + * Example: + * + * {{{ + * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) + * }}} + */ + def join(right: Table): Table = { + val leftInputNames = operation.outputFields.map(_._1).toSet + val rightInputNames = right.operation.outputFields.map(_._1).toSet + if (leftInputNames.intersect(rightInputNames).nonEmpty) { + throw new ExpressionException( + "Overlapping fields names on join input, result would be ambiguous: " + + operation.outputFields.mkString(", ") + + " and " + + right.operation.outputFields.mkString(", ") ) + } + this.copy(operation = Join(operation, right.operation)) + } + + /** + * Union two[[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations + * must fully overlap. + * + * Example: + * + * {{{ + * left.unionAll(right) + * }}} + */ + def unionAll(right: Table): Table = { + val leftInputFields = operation.outputFields + val rightInputFields = right.operation.outputFields + if (!leftInputFields.equals(rightInputFields)) { + throw new ExpressionException( + "The fields names of join inputs should be fully overlapped, left inputs fields:" + + operation.outputFields.mkString(", ") + + " and right inputs fields" + + right.operation.outputFields.mkString(", ") + ) + } + this.copy(operation = UnionAll(operation, right.operation)) + } + + /** + * Get the process of the sql parsing, print AST and physical execution plan.The AST + * show the structure of the supplied statement. The execution plan shows how the table + * referenced by the statement will be scanned. + */ + def explain(extended: Boolean): String = { --- End diff -- It looks like you add only this method. Can you check why the whole file is marked as changed?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---